Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Randall, That's a fair assessment; if a user upgrades their cluster to 3.0 with no changes to worker or connector configs, it's possible that their cluster will break if their worker principal(s) lack the necessary ACLs on the Kafka cluster that hosts the config topic. If we wanted to take a more conservative approach, we could allow users to opt in to the use of a transactional producer by their cluster's leader through some worker configuration property. The rolling upgrade process from some pre-3.0 cluster to a 3.0+ cluster with exactly-once source support enabled would become: 1. Upgrade cluster to 3.0 (or a later version, if one is available) 2. Enable the use of a transactional producer by the cluster's leader 3. Enable exactly-once source support Since steps 1 and 2 could take place within the same rolling upgrade, the number of rolling upgrades for this new approach would be the same as the current approach: two. The only downside would be additional configuration complexity for the worker, and the upgrade process itself would be a little trickier for users (and potentially more error-prone). In order to reduce the added configuration complexity as much as possible, we could expose this intermediate state (workers are on 3.0 and the leader uses a transactional producer, but exactly-once source support is not enabled) by renaming the "exactly.once.source.enabled" property to "exactly.once.source.support", and permitting values of "disabled" (default), "preparing", and "enabled". The "preparing" and "enabled" values would provide the same behavior as the current proposal with "exactly.once.source.enabled" set to "false" and "true", respectively, and "disabled" would have the same behavior as the current proposal, except without the use of a transactional producer by the leader. I'll update the proposal with this new behavior shortly. Thanks for the review! Cheers, Chris On Wed, Jun 9, 2021 at 1:02 PM Randall Hauch wrote: > Chris, > > Sorry for the late question/comment. But the "Breaking Changes" concerns > me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when > they restart their 3.0 worker(s) the workers will fail due to this producer > requirement even if they make no changes to their worker configs or > connector configs. Is this correct? > > If so, I'm concerned about this. Even though the additional producer ACLs > are seemingly minor and easy to change, it is likely that users will not > read the docs before they upgrade, causing their simple upgrade to fail. > And even though in 3.0 we could allow ourselves to cause breaking changes > with a major release, I personally would prefer we not have any such > breaking changes. > > Given that, what would be required for us to eliminate that breaking > change, or change it from a breaking change to a prerequisite for enabling > EOS support in their cluster? > > Thanks, > > Randall > > On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton > wrote: > > > Hi Tom, > > > > I do agree that it'd be safer to default to "required", but since at the > > time of the 3.0 release no existing connectors will have implemented the > > "SourceConnector::exactlyOnceSupport" method, it'd require all Connect > > users to downgrade to "requested" anyways in order to enable exactly-once > > support on their workers. The friction there seems a little excessive; we > > might consider changing the default from "requested" to "required" later > on > > down the line after connector developers have had enough time to put out > > new connector versions that implement the new API. Thoughts? > > > > Cheers, > > > > Chris > > > > On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley wrote: > > > > > Hi Chris, > > > > > > Just a minor question: I can see why the default for > exactly.once.support > > > is requested (you want a good first-run experience, I assume), but > it's a > > > little like engineering a safety catch and then not enabling it. > Wouldn't > > > it be safer to default to required, so that there's no way someone can > > > mistakenly not get EoS without explicitly having configured it? > > > > > > Thanks, > > > > > > Tom > > > > > > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton > > > > > > wrote: > > > > > > > Hi Gunnar, > > > > > > > > Thanks for taking a look! I've addressed the low-hanging fruit in the > > > KIP; > > > > responses to other comments inline here: > > > > > > > > > * TransactionContext: What's the use case for the methods > accepting a > > > > source record (commitTransaction(SourceRecord > > > > record), abortTransaction(SourceRecord record))? > > > > > > > > This allows developers to decouple transaction boundaries from record > > > > batches. If a connector has a configuration that dictates how often > it > > > > returns from "SourceTask::poll", for example, it may be easier to > > define > > > > multiple transactions within a single batch or a single transaction > > > across > > > > several batches than to retrofit the connector
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Chris, Sorry for the late question/comment. But the "Breaking Changes" concerns me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when they restart their 3.0 worker(s) the workers will fail due to this producer requirement even if they make no changes to their worker configs or connector configs. Is this correct? If so, I'm concerned about this. Even though the additional producer ACLs are seemingly minor and easy to change, it is likely that users will not read the docs before they upgrade, causing their simple upgrade to fail. And even though in 3.0 we could allow ourselves to cause breaking changes with a major release, I personally would prefer we not have any such breaking changes. Given that, what would be required for us to eliminate that breaking change, or change it from a breaking change to a prerequisite for enabling EOS support in their cluster? Thanks, Randall On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton wrote: > Hi Tom, > > I do agree that it'd be safer to default to "required", but since at the > time of the 3.0 release no existing connectors will have implemented the > "SourceConnector::exactlyOnceSupport" method, it'd require all Connect > users to downgrade to "requested" anyways in order to enable exactly-once > support on their workers. The friction there seems a little excessive; we > might consider changing the default from "requested" to "required" later on > down the line after connector developers have had enough time to put out > new connector versions that implement the new API. Thoughts? > > Cheers, > > Chris > > On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley wrote: > > > Hi Chris, > > > > Just a minor question: I can see why the default for exactly.once.support > > is requested (you want a good first-run experience, I assume), but it's a > > little like engineering a safety catch and then not enabling it. Wouldn't > > it be safer to default to required, so that there's no way someone can > > mistakenly not get EoS without explicitly having configured it? > > > > Thanks, > > > > Tom > > > > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton > > > wrote: > > > > > Hi Gunnar, > > > > > > Thanks for taking a look! I've addressed the low-hanging fruit in the > > KIP; > > > responses to other comments inline here: > > > > > > > * TransactionContext: What's the use case for the methods accepting a > > > source record (commitTransaction(SourceRecord > > > record), abortTransaction(SourceRecord record))? > > > > > > This allows developers to decouple transaction boundaries from record > > > batches. If a connector has a configuration that dictates how often it > > > returns from "SourceTask::poll", for example, it may be easier to > define > > > multiple transactions within a single batch or a single transaction > > across > > > several batches than to retrofit the connector's poll logic to work > with > > > transaction boundaries. > > > > > > > * SourceTaskContext: Instead of guarding against NSME, is there a way > > for > > > a > > > connector to query the KC version and thus derive its capabilities? > Going > > > forward, a generic API for querying capabilities could be nice, so a > > > connector can query for capabilities of the runtime in a safe and > > > compatible way. > > > > > > This would be a great quality-of-life improvement for connector and > > > framework developers alike, but I think it may be best left for a > > separate > > > KIP. The current approach, clunky though it may be, seems like a > nuisance > > > at worst. It's definitely worth addressing but I'm not sure we have the > > > time to think through all the details thoroughly enough in time for the > > > upcoming KIP freeze. > > > > > > > * SourceConnector: Would it make sense to merge the two methods > perhaps > > > and > > > return one enum of { SUPPORTED, NOT_SUPPORTED, > SUPPORTED_WITH_BOUNDARIES > > }? > > > > > > Hmm... at first glance I like the idea of merging the two methods a > lot. > > > The one thing that gives me pause is that there may be connectors that > > > would like to define their own transaction boundaries without providing > > > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to > > > accommodate that, but then, it might actually be simpler to keep the > two > > > methods separate in case we add some third variable to the mix that > would > > > also have to be reflected in the possible ExactlyOnceSupport enum > values. > > > > > > > Or, alternatively return an enum from > canDefineTransactionBoundaries(), > > > too; even if it only has two values now, that'd allow for extension in > > the > > > future > > > > > > This is fine by me; we just have to figure out exactly which enum > values > > > would be suitable. It's a little clunky but right now I'm toying with > > > something like "ConnectorDefinedTransactionBoundaries" with values of > > > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we > > > need more granularity in the future t
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Tom, I do agree that it'd be safer to default to "required", but since at the time of the 3.0 release no existing connectors will have implemented the "SourceConnector::exactlyOnceSupport" method, it'd require all Connect users to downgrade to "requested" anyways in order to enable exactly-once support on their workers. The friction there seems a little excessive; we might consider changing the default from "requested" to "required" later on down the line after connector developers have had enough time to put out new connector versions that implement the new API. Thoughts? Cheers, Chris On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley wrote: > Hi Chris, > > Just a minor question: I can see why the default for exactly.once.support > is requested (you want a good first-run experience, I assume), but it's a > little like engineering a safety catch and then not enabling it. Wouldn't > it be safer to default to required, so that there's no way someone can > mistakenly not get EoS without explicitly having configured it? > > Thanks, > > Tom > > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton > wrote: > > > Hi Gunnar, > > > > Thanks for taking a look! I've addressed the low-hanging fruit in the > KIP; > > responses to other comments inline here: > > > > > * TransactionContext: What's the use case for the methods accepting a > > source record (commitTransaction(SourceRecord > > record), abortTransaction(SourceRecord record))? > > > > This allows developers to decouple transaction boundaries from record > > batches. If a connector has a configuration that dictates how often it > > returns from "SourceTask::poll", for example, it may be easier to define > > multiple transactions within a single batch or a single transaction > across > > several batches than to retrofit the connector's poll logic to work with > > transaction boundaries. > > > > > * SourceTaskContext: Instead of guarding against NSME, is there a way > for > > a > > connector to query the KC version and thus derive its capabilities? Going > > forward, a generic API for querying capabilities could be nice, so a > > connector can query for capabilities of the runtime in a safe and > > compatible way. > > > > This would be a great quality-of-life improvement for connector and > > framework developers alike, but I think it may be best left for a > separate > > KIP. The current approach, clunky though it may be, seems like a nuisance > > at worst. It's definitely worth addressing but I'm not sure we have the > > time to think through all the details thoroughly enough in time for the > > upcoming KIP freeze. > > > > > * SourceConnector: Would it make sense to merge the two methods perhaps > > and > > return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES > }? > > > > Hmm... at first glance I like the idea of merging the two methods a lot. > > The one thing that gives me pause is that there may be connectors that > > would like to define their own transaction boundaries without providing > > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to > > accommodate that, but then, it might actually be simpler to keep the two > > methods separate in case we add some third variable to the mix that would > > also have to be reflected in the possible ExactlyOnceSupport enum values. > > > > > Or, alternatively return an enum from canDefineTransactionBoundaries(), > > too; even if it only has two values now, that'd allow for extension in > the > > future > > > > This is fine by me; we just have to figure out exactly which enum values > > would be suitable. It's a little clunky but right now I'm toying with > > something like "ConnectorDefinedTransactionBoundaries" with values of > > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we > > need more granularity in the future then we can deprecate one or both of > > them and add new values. Thoughts? > > > > > And one general question: in Debezium, we have some connectors that > > produce > > records "out-of-bands" to a schema history topic via their own custom > > producer. Is there any way envisionable where such a producer would > > participate in the transaction managed by the KC runtime environment? > > > > To answer the question exactly as asked: no; transactions cannot be > shared > > across producers and until/unless that is changed (which seems unlikely) > > this won't be possible. However, I'm curious why a source connector would > > spin up its own producer instead of using "SourceTask::poll" to provide > > records to Connect. Is it easier to consume from that topic when the > > connector can define its own (de)serialization format? I'm optimistic > that > > if we understand the use case for the separate producer we may still be > > able to help bridge the gap here, one way or another. > > > > > One follow-up question after thinking some more about this; is there > any > > limit in terms of duration or size of in-flight, connector-controlled > > transactions?
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Chris, Just a minor question: I can see why the default for exactly.once.support is requested (you want a good first-run experience, I assume), but it's a little like engineering a safety catch and then not enabling it. Wouldn't it be safer to default to required, so that there's no way someone can mistakenly not get EoS without explicitly having configured it? Thanks, Tom On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton wrote: > Hi Gunnar, > > Thanks for taking a look! I've addressed the low-hanging fruit in the KIP; > responses to other comments inline here: > > > * TransactionContext: What's the use case for the methods accepting a > source record (commitTransaction(SourceRecord > record), abortTransaction(SourceRecord record))? > > This allows developers to decouple transaction boundaries from record > batches. If a connector has a configuration that dictates how often it > returns from "SourceTask::poll", for example, it may be easier to define > multiple transactions within a single batch or a single transaction across > several batches than to retrofit the connector's poll logic to work with > transaction boundaries. > > > * SourceTaskContext: Instead of guarding against NSME, is there a way for > a > connector to query the KC version and thus derive its capabilities? Going > forward, a generic API for querying capabilities could be nice, so a > connector can query for capabilities of the runtime in a safe and > compatible way. > > This would be a great quality-of-life improvement for connector and > framework developers alike, but I think it may be best left for a separate > KIP. The current approach, clunky though it may be, seems like a nuisance > at worst. It's definitely worth addressing but I'm not sure we have the > time to think through all the details thoroughly enough in time for the > upcoming KIP freeze. > > > * SourceConnector: Would it make sense to merge the two methods perhaps > and > return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }? > > Hmm... at first glance I like the idea of merging the two methods a lot. > The one thing that gives me pause is that there may be connectors that > would like to define their own transaction boundaries without providing > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to > accommodate that, but then, it might actually be simpler to keep the two > methods separate in case we add some third variable to the mix that would > also have to be reflected in the possible ExactlyOnceSupport enum values. > > > Or, alternatively return an enum from canDefineTransactionBoundaries(), > too; even if it only has two values now, that'd allow for extension in the > future > > This is fine by me; we just have to figure out exactly which enum values > would be suitable. It's a little clunky but right now I'm toying with > something like "ConnectorDefinedTransactionBoundaries" with values of > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we > need more granularity in the future then we can deprecate one or both of > them and add new values. Thoughts? > > > And one general question: in Debezium, we have some connectors that > produce > records "out-of-bands" to a schema history topic via their own custom > producer. Is there any way envisionable where such a producer would > participate in the transaction managed by the KC runtime environment? > > To answer the question exactly as asked: no; transactions cannot be shared > across producers and until/unless that is changed (which seems unlikely) > this won't be possible. However, I'm curious why a source connector would > spin up its own producer instead of using "SourceTask::poll" to provide > records to Connect. Is it easier to consume from that topic when the > connector can define its own (de)serialization format? I'm optimistic that > if we understand the use case for the separate producer we may still be > able to help bridge the gap here, one way or another. > > > One follow-up question after thinking some more about this; is there any > limit in terms of duration or size of in-flight, connector-controlled > transactions? In case of Debezium for instance, there may be cases where we > tail the TX log from an upstream source database, not knowing whether the > events we receive belong to a committed or aborted transaction. Would it be > valid to emit all these events via a transactional task, and in case we > receive a ROLLBACK event eventually, to abort the pending Kafka > transaction? Such source transactions could be running for a long time > potentially, e.g. hours or days (at least in theory). Or would this sort of > usage not be considered a reasonable one? > > I think the distinction between reasonable and unreasonable usage here is > likely dependent on use cases that people are trying to satisfy with their > connector, but if I had to guess, I'd say that a different approach is > probably warranted in most cases if the transaction span
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Gunnar, Thanks for taking a look! I've addressed the low-hanging fruit in the KIP; responses to other comments inline here: > * TransactionContext: What's the use case for the methods accepting a source record (commitTransaction(SourceRecord record), abortTransaction(SourceRecord record))? This allows developers to decouple transaction boundaries from record batches. If a connector has a configuration that dictates how often it returns from "SourceTask::poll", for example, it may be easier to define multiple transactions within a single batch or a single transaction across several batches than to retrofit the connector's poll logic to work with transaction boundaries. > * SourceTaskContext: Instead of guarding against NSME, is there a way for a connector to query the KC version and thus derive its capabilities? Going forward, a generic API for querying capabilities could be nice, so a connector can query for capabilities of the runtime in a safe and compatible way. This would be a great quality-of-life improvement for connector and framework developers alike, but I think it may be best left for a separate KIP. The current approach, clunky though it may be, seems like a nuisance at worst. It's definitely worth addressing but I'm not sure we have the time to think through all the details thoroughly enough in time for the upcoming KIP freeze. > * SourceConnector: Would it make sense to merge the two methods perhaps and return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }? Hmm... at first glance I like the idea of merging the two methods a lot. The one thing that gives me pause is that there may be connectors that would like to define their own transaction boundaries without providing exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to accommodate that, but then, it might actually be simpler to keep the two methods separate in case we add some third variable to the mix that would also have to be reflected in the possible ExactlyOnceSupport enum values. > Or, alternatively return an enum from canDefineTransactionBoundaries(), too; even if it only has two values now, that'd allow for extension in the future This is fine by me; we just have to figure out exactly which enum values would be suitable. It's a little clunky but right now I'm toying with something like "ConnectorDefinedTransactionBoundaries" with values of "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we need more granularity in the future then we can deprecate one or both of them and add new values. Thoughts? > And one general question: in Debezium, we have some connectors that produce records "out-of-bands" to a schema history topic via their own custom producer. Is there any way envisionable where such a producer would participate in the transaction managed by the KC runtime environment? To answer the question exactly as asked: no; transactions cannot be shared across producers and until/unless that is changed (which seems unlikely) this won't be possible. However, I'm curious why a source connector would spin up its own producer instead of using "SourceTask::poll" to provide records to Connect. Is it easier to consume from that topic when the connector can define its own (de)serialization format? I'm optimistic that if we understand the use case for the separate producer we may still be able to help bridge the gap here, one way or another. > One follow-up question after thinking some more about this; is there any limit in terms of duration or size of in-flight, connector-controlled transactions? In case of Debezium for instance, there may be cases where we tail the TX log from an upstream source database, not knowing whether the events we receive belong to a committed or aborted transaction. Would it be valid to emit all these events via a transactional task, and in case we receive a ROLLBACK event eventually, to abort the pending Kafka transaction? Such source transactions could be running for a long time potentially, e.g. hours or days (at least in theory). Or would this sort of usage not be considered a reasonable one? I think the distinction between reasonable and unreasonable usage here is likely dependent on use cases that people are trying to satisfy with their connector, but if I had to guess, I'd say that a different approach is probably warranted in most cases if the transaction spans across entire days at a time. If there's no concern about data not being visible to downstream consumers until its transaction is committed, and the number of records in the transaction isn't so large that the amount of memory required to buffer them all locally on a consumer before delivering them to the downstream application is reasonable, it would technically be possible though. Connect users would have to be mindful of the following: - A separate offsets topic for the connector would be highly recommended in order to avoid crippling other connectors with hanging transactions
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Chris, One follow-up question after thinking some more about this; is there any limit in terms of duration or size of in-flight, connector-controlled transactions? In case of Debezium for instance, there may be cases where we tail the TX log from an upstream source database, not knowing whether the events we receive belong to a committed or aborted transaction. Would it be valid to emit all these events via a transactional task, and in case we receive a ROLLBACK event eventually, to abort the pending Kafka transaction? Such source transactions could be running for a long time potentially, e.g. hours or days (at least in theory). Or would this sort of usage not be considered a reasonable one? Thanks, --Gunnar Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling < gunnar.morl...@googlemail.com>: > Chris, all, > > I've just read KIP-618, and let me congratulate you first of all for this > impressive piece of work! Here's a few small suggestions and questions I > had while reading: > > * TransactionContext: What's the use case for the methods accepting a > source record (commitTransaction(SourceRecord > record), abortTransaction(SourceRecord record))? > * SourceTaskContext: Typo in "when the sink connector is deployed" -> > source task > * SourceTaskContext: Instead of guarding against NSME, is there a way for > a connector to query the KC version and thus derive its capabilities? Going > forward, a generic API for querying capabilities could be nice, so a > connector can query for capabilities of the runtime in a safe and > compatible way. > * SourceConnector: exactlyOnceSupport() -> false return value doesn't match > * SourceConnector: Would it make sense to merge the two methods perhaps > and return one enum of { SUPPORTED, NOT_SUPPORTED, > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum > from canDefineTransactionBoundaries(), too; even if it only has two values > now, that'd allow for extension in the future > > And one general question: in Debezium, we have some connectors that > produce records "out-of-bands" to a schema history topic via their own > custom producer. Is there any way envisionable where such a producer would > participate in the transaction managed by the KC runtime environment? > > Thanks a lot, > > --Gunnar > > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton > : > >> Hi all, >> >> Wanted to note here that I've updated the KIP document to include the >> changes discussed recently. They're mostly located in the "Public >> Interfaces" section. I suspect discussion hasn't concluded yet and there >> will probably be a few more changes to come, but wanted to take the >> opportunity to provide a snapshot of what the current design looks like. >> >> Cheers, >> >> Chris >> >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton >> wrote: >> >> > Hi Tom, >> > >> > Wow, I was way off base! I was thinking that the intent of the fencible >> > producer was to employ it by default with 3.0, as opposed to only after >> the >> > worker-level >> > "exactly.once.source.enabled" property was flipped on. You are correct >> > that with the case you were actually describing, there would be no >> > heightened ACL requirements, and that it would leave room in the future >> for >> > exactly-once to be disabled on a per-connector basis (as long as all the >> > workers in the cluster already had "exactly.once.source.enabled" set to >> > "true") with no worries about breaking changes. >> > >> > I agree that this is something for another KIP; even if we could squeeze >> > it in in time for this release, it might be a bit much for new users to >> > take in all at once. But I can add it to the doc as "future work" since >> > it's a promising idea that could prove valuable to someone who might >> need >> > per-connector granularity in the future. >> > >> > Thanks for clearing things up; in retrospect your comments make a lot >> more >> > sense now, and I hope I've sufficiently addressed them by now. >> > >> > PSA for you and everyone else--I plan on updating the doc next week with >> > the new APIs for connector-defined transaction boundaries, >> > user-configurable transaction boundaries (i.e., poll vs. interval vs. >> > connectors), and preflight checks for exactly-once validation (required >> vs. >> > requested). >> > >> > Cheers, >> > >> > Chris >> > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley >> wrote: >> > >> >> Hi Chris, >> >> >> >> Thanks for continuing to entertain some of these ideas. >> >> >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton >> > >> > >> >> wrote: >> >> >> >> > [...] >> >> > >> >> That's true, but we do go from three static ACLs (write/describe on a >> >> fixed >> >> > transactional ID, and idempotent write on a fixed cluster) to a >> dynamic >> >> > collection of ACLs. >> >> > >> >> >> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I was >> >> suggesting the use of a 'fencing producer' only in clusters with >> >> exactly.once.source.ena
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Chris, all, I've just read KIP-618, and let me congratulate you first of all for this impressive piece of work! Here's a few small suggestions and questions I had while reading: * TransactionContext: What's the use case for the methods accepting a source record (commitTransaction(SourceRecord record), abortTransaction(SourceRecord record))? * SourceTaskContext: Typo in "when the sink connector is deployed" -> source task * SourceTaskContext: Instead of guarding against NSME, is there a way for a connector to query the KC version and thus derive its capabilities? Going forward, a generic API for querying capabilities could be nice, so a connector can query for capabilities of the runtime in a safe and compatible way. * SourceConnector: exactlyOnceSupport() -> false return value doesn't match * SourceConnector: Would it make sense to merge the two methods perhaps and return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum from canDefineTransactionBoundaries(), too; even if it only has two values now, that'd allow for extension in the future And one general question: in Debezium, we have some connectors that produce records "out-of-bands" to a schema history topic via their own custom producer. Is there any way envisionable where such a producer would participate in the transaction managed by the KC runtime environment? Thanks a lot, --Gunnar Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton : > Hi all, > > Wanted to note here that I've updated the KIP document to include the > changes discussed recently. They're mostly located in the "Public > Interfaces" section. I suspect discussion hasn't concluded yet and there > will probably be a few more changes to come, but wanted to take the > opportunity to provide a snapshot of what the current design looks like. > > Cheers, > > Chris > > On Fri, May 21, 2021 at 4:32 PM Chris Egerton wrote: > > > Hi Tom, > > > > Wow, I was way off base! I was thinking that the intent of the fencible > > producer was to employ it by default with 3.0, as opposed to only after > the > > worker-level > > "exactly.once.source.enabled" property was flipped on. You are correct > > that with the case you were actually describing, there would be no > > heightened ACL requirements, and that it would leave room in the future > for > > exactly-once to be disabled on a per-connector basis (as long as all the > > workers in the cluster already had "exactly.once.source.enabled" set to > > "true") with no worries about breaking changes. > > > > I agree that this is something for another KIP; even if we could squeeze > > it in in time for this release, it might be a bit much for new users to > > take in all at once. But I can add it to the doc as "future work" since > > it's a promising idea that could prove valuable to someone who might need > > per-connector granularity in the future. > > > > Thanks for clearing things up; in retrospect your comments make a lot > more > > sense now, and I hope I've sufficiently addressed them by now. > > > > PSA for you and everyone else--I plan on updating the doc next week with > > the new APIs for connector-defined transaction boundaries, > > user-configurable transaction boundaries (i.e., poll vs. interval vs. > > connectors), and preflight checks for exactly-once validation (required > vs. > > requested). > > > > Cheers, > > > > Chris > > > > On Fri, May 21, 2021 at 7:14 AM Tom Bentley wrote: > > > >> Hi Chris, > >> > >> Thanks for continuing to entertain some of these ideas. > >> > >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton > >> > > >> wrote: > >> > >> > [...] > >> > > >> That's true, but we do go from three static ACLs (write/describe on a > >> fixed > >> > transactional ID, and idempotent write on a fixed cluster) to a > dynamic > >> > collection of ACLs. > >> > > >> > >> I'm not quite sure I follow, maybe I've lost track. To be clear, I was > >> suggesting the use of a 'fencing producer' only in clusters with > >> exactly.once.source.enabled=true where I imagined the key difference > >> between the exactly once and fencing cases was how the producer was > >> configured/used (transactional vs this new fencing semantic). I think > the > >> ACL requirements for connector producer principals would therefore be > the > >> same as currently described in the KIP. The same is true for the worker > >> principals (which is the only breaking change you give in the KIP). So I > >> don't think the fencing idea changes the backwards compatibility story > >> that's already in the KIP, just allows a safe per-connector > >> exactly.once=disabled option to be supported (with required as requested > >> as > >> we already discussed). > >> > >> But I'm wondering whether I've overlooked something. > >> > >> Ultimately I think it may behoove us to err on the side of reducing the > >> > breaking changes here for now and saving them for 4.0 (or some later > >> major > >> > release), but would be interes
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi all, Wanted to note here that I've updated the KIP document to include the changes discussed recently. They're mostly located in the "Public Interfaces" section. I suspect discussion hasn't concluded yet and there will probably be a few more changes to come, but wanted to take the opportunity to provide a snapshot of what the current design looks like. Cheers, Chris On Fri, May 21, 2021 at 4:32 PM Chris Egerton wrote: > Hi Tom, > > Wow, I was way off base! I was thinking that the intent of the fencible > producer was to employ it by default with 3.0, as opposed to only after the > worker-level > "exactly.once.source.enabled" property was flipped on. You are correct > that with the case you were actually describing, there would be no > heightened ACL requirements, and that it would leave room in the future for > exactly-once to be disabled on a per-connector basis (as long as all the > workers in the cluster already had "exactly.once.source.enabled" set to > "true") with no worries about breaking changes. > > I agree that this is something for another KIP; even if we could squeeze > it in in time for this release, it might be a bit much for new users to > take in all at once. But I can add it to the doc as "future work" since > it's a promising idea that could prove valuable to someone who might need > per-connector granularity in the future. > > Thanks for clearing things up; in retrospect your comments make a lot more > sense now, and I hope I've sufficiently addressed them by now. > > PSA for you and everyone else--I plan on updating the doc next week with > the new APIs for connector-defined transaction boundaries, > user-configurable transaction boundaries (i.e., poll vs. interval vs. > connectors), and preflight checks for exactly-once validation (required vs. > requested). > > Cheers, > > Chris > > On Fri, May 21, 2021 at 7:14 AM Tom Bentley wrote: > >> Hi Chris, >> >> Thanks for continuing to entertain some of these ideas. >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton > > >> wrote: >> >> > [...] >> > >> That's true, but we do go from three static ACLs (write/describe on a >> fixed >> > transactional ID, and idempotent write on a fixed cluster) to a dynamic >> > collection of ACLs. >> > >> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I was >> suggesting the use of a 'fencing producer' only in clusters with >> exactly.once.source.enabled=true where I imagined the key difference >> between the exactly once and fencing cases was how the producer was >> configured/used (transactional vs this new fencing semantic). I think the >> ACL requirements for connector producer principals would therefore be the >> same as currently described in the KIP. The same is true for the worker >> principals (which is the only breaking change you give in the KIP). So I >> don't think the fencing idea changes the backwards compatibility story >> that's already in the KIP, just allows a safe per-connector >> exactly.once=disabled option to be supported (with required as requested >> as >> we already discussed). >> >> But I'm wondering whether I've overlooked something. >> >> Ultimately I think it may behoove us to err on the side of reducing the >> > breaking changes here for now and saving them for 4.0 (or some later >> major >> > release), but would be interested in thoughts from you and others. >> > >> >> Difficult to answer (given I think I might be missing something). >> If there are breaking changes then I don't disagree. It's difficult to >> reason about big changes like this without some practical experience. >> If there are not, then I think we could also implement the whole >> exactly.once=disabled thing in a later KIP without additional breaking >> changes (i.e. some time in 3.x), right? >> >> >> > > Gouzhang also has a (possible) use case for a fencing-only producer ( >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as he points >> out >> > there, you should be able to get these semantics today by calling >> > initTransactions() and then just using the producer as normal (no >> > beginTransaction()/abortTransaction()/endTransaction()). >> > >> > I tested this locally and was not met with success; transactional >> producers >> > do a check right now to ensure that any calls to "KafkaProducer::send" >> > occur within a transaction (see >> > >> > >> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959 >> > and >> > >> > >> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451 >> > ). >> > Not a blocker, just noting that we'd have to do some legwork to make >> this >> > workable with the producer API. >> > >> >> Ah, sorry, I should have actually tried it rather than just taking a quick >> look at the code. >> >> Rather than remove those s
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Tom, Wow, I was way off base! I was thinking that the intent of the fencible producer was to employ it by default with 3.0, as opposed to only after the worker-level "exactly.once.source.enabled" property was flipped on. You are correct that with the case you were actually describing, there would be no heightened ACL requirements, and that it would leave room in the future for exactly-once to be disabled on a per-connector basis (as long as all the workers in the cluster already had "exactly.once.source.enabled" set to "true") with no worries about breaking changes. I agree that this is something for another KIP; even if we could squeeze it in in time for this release, it might be a bit much for new users to take in all at once. But I can add it to the doc as "future work" since it's a promising idea that could prove valuable to someone who might need per-connector granularity in the future. Thanks for clearing things up; in retrospect your comments make a lot more sense now, and I hope I've sufficiently addressed them by now. PSA for you and everyone else--I plan on updating the doc next week with the new APIs for connector-defined transaction boundaries, user-configurable transaction boundaries (i.e., poll vs. interval vs. connectors), and preflight checks for exactly-once validation (required vs. requested). Cheers, Chris On Fri, May 21, 2021 at 7:14 AM Tom Bentley wrote: > Hi Chris, > > Thanks for continuing to entertain some of these ideas. > > On Fri, May 14, 2021 at 5:06 PM Chris Egerton > > wrote: > > > [...] > > > That's true, but we do go from three static ACLs (write/describe on a fixed > > transactional ID, and idempotent write on a fixed cluster) to a dynamic > > collection of ACLs. > > > > I'm not quite sure I follow, maybe I've lost track. To be clear, I was > suggesting the use of a 'fencing producer' only in clusters with > exactly.once.source.enabled=true where I imagined the key difference > between the exactly once and fencing cases was how the producer was > configured/used (transactional vs this new fencing semantic). I think the > ACL requirements for connector producer principals would therefore be the > same as currently described in the KIP. The same is true for the worker > principals (which is the only breaking change you give in the KIP). So I > don't think the fencing idea changes the backwards compatibility story > that's already in the KIP, just allows a safe per-connector > exactly.once=disabled option to be supported (with required as requested as > we already discussed). > > But I'm wondering whether I've overlooked something. > > Ultimately I think it may behoove us to err on the side of reducing the > > breaking changes here for now and saving them for 4.0 (or some later > major > > release), but would be interested in thoughts from you and others. > > > > Difficult to answer (given I think I might be missing something). > If there are breaking changes then I don't disagree. It's difficult to > reason about big changes like this without some practical experience. > If there are not, then I think we could also implement the whole > exactly.once=disabled thing in a later KIP without additional breaking > changes (i.e. some time in 3.x), right? > > > > > Gouzhang also has a (possible) use case for a fencing-only producer ( > > https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out > > there, you should be able to get these semantics today by calling > > initTransactions() and then just using the producer as normal (no > > beginTransaction()/abortTransaction()/endTransaction()). > > > > I tested this locally and was not met with success; transactional > producers > > do a check right now to ensure that any calls to "KafkaProducer::send" > > occur within a transaction (see > > > > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959 > > and > > > > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451 > > ). > > Not a blocker, just noting that we'd have to do some legwork to make this > > workable with the producer API. > > > > Ah, sorry, I should have actually tried it rather than just taking a quick > look at the code. > > Rather than remove those safety checks I suppose we'd need a way of > distinguishing, in the config, the difference in semantics. E.g. Something > like a fencing.id config, which was mutually exclusive with > transactional.id. > Likewise perhaps initFencing() alongside initTransactions() in the API. But > I think at this point it's something for another KIP. > > Kind regards, > > Tom >
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Chris, Thanks for continuing to entertain some of these ideas. On Fri, May 14, 2021 at 5:06 PM Chris Egerton wrote: > [...] > That's true, but we do go from three static ACLs (write/describe on a fixed > transactional ID, and idempotent write on a fixed cluster) to a dynamic > collection of ACLs. > I'm not quite sure I follow, maybe I've lost track. To be clear, I was suggesting the use of a 'fencing producer' only in clusters with exactly.once.source.enabled=true where I imagined the key difference between the exactly once and fencing cases was how the producer was configured/used (transactional vs this new fencing semantic). I think the ACL requirements for connector producer principals would therefore be the same as currently described in the KIP. The same is true for the worker principals (which is the only breaking change you give in the KIP). So I don't think the fencing idea changes the backwards compatibility story that's already in the KIP, just allows a safe per-connector exactly.once=disabled option to be supported (with required as requested as we already discussed). But I'm wondering whether I've overlooked something. Ultimately I think it may behoove us to err on the side of reducing the > breaking changes here for now and saving them for 4.0 (or some later major > release), but would be interested in thoughts from you and others. > Difficult to answer (given I think I might be missing something). If there are breaking changes then I don't disagree. It's difficult to reason about big changes like this without some practical experience. If there are not, then I think we could also implement the whole exactly.once=disabled thing in a later KIP without additional breaking changes (i.e. some time in 3.x), right? > > Gouzhang also has a (possible) use case for a fencing-only producer ( > https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out > there, you should be able to get these semantics today by calling > initTransactions() and then just using the producer as normal (no > beginTransaction()/abortTransaction()/endTransaction()). > > I tested this locally and was not met with success; transactional producers > do a check right now to ensure that any calls to "KafkaProducer::send" > occur within a transaction (see > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959 > and > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451 > ). > Not a blocker, just noting that we'd have to do some legwork to make this > workable with the producer API. > Ah, sorry, I should have actually tried it rather than just taking a quick look at the code. Rather than remove those safety checks I suppose we'd need a way of distinguishing, in the config, the difference in semantics. E.g. Something like a fencing.id config, which was mutually exclusive with transactional.id. Likewise perhaps initFencing() alongside initTransactions() in the API. But I think at this point it's something for another KIP. Kind regards, Tom
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hey Jeremy, Thanks for taking a look! Always nice to have input from connector developers, especially ones as prolific as you. I was hoping to leave connector-defined transaction boundaries for future work as the use cases for them were unclear. For example, with transactions in an upstream data source, if a connector developer wants to preserve those transactions downstream, their connector can produce source records representing those transaction boundaries, and allow users to then perform their own filtering logic based on those records. Alternatively, the connector can do that filtering itself by refusing to produce any records until the upstream transaction from which they are derived is committed. Given that it's pretty much a sure thing at this point that producer transaction commits will not subdivide task-defined batches (where a batch is the collection of records returned in a single call to "SourceTask::put"), I wonder if the Spooldir connector could follow that second approach: buffer all the records for a file and then provide them to Connect all in one batch. And, if this is infeasible for large files, the question then becomes--how would we expect downstream applications to handle this? A large amount of the recent discussion here recently has been centered around the problems posed by large transactions, both in terms of increased latency and heightened memory requirements for consumers (which would have to buffer all the records in that transaction locally until a commit marker is encountered, on a per-topic-partition basis). As far as heightened latency goes, if you need whole files written at once, it seems like a reasonable tradeoff to make that probably won't ruffle a lot of feathers (although your input here to validate that assumption would be valuable). With regards to memory requirements--I guess it's not out of the question to suggest that users who truly need atomic transfer of entire files at a time be equipped with the right hardware to be able to support it. In downstream applications this is unavoidable until/unless broker-side transaction filtering is implemented. However, if we allow connector-defined transaction boundaries, connector tasks wouldn't need the additional memory as they would be able to send records to Connect as they read them, which would respond by dispatching them to a producer on the fly. On the other hand, if tasks have to locally buffer records in order to be able to return them all in a single batch, then we inflict that new, possibly-painful memory requirement on Connect workers as well. Okay, stream of consciousness over. I think I see a valid use case for connector-defined transaction boundaries. That leaves us with the task of designing an API for connector developers, and deciding on how to provide this option to users (some may not need or want to respect the transaction boundaries defined by their connector). As far as the API goes, I think the approach you mentioned offline of using the source task context makes sense for the most part, but would introduce some small cross-compatibility issues where it would become more difficult to run newer connectors on older versions of Connect. Instead, we could take a page out of KIP-610's book and do something like this: public interface SourceTaskContext { // Existing methods and fields omitted in this snippet public TransactionContext transactionContext(); } A new TransactionContext interface would be introduced. Tasks can grab an instance of it on startup from the source task context and use it to communicate transaction boundaries to the worker over the course of their lifetime. If the worker is running an older version of Connect that doesn't support this API, they'll be met with a classloading error during the initial call to "transactionContext()", and will be able to deduce from it that they won't be able to define their own transaction boundaries. The benefit is that, if this is all done on task startup, the classloading error will only have to be caught once, instead of every time any of the transaction-related methods are invoked. For the TransactionContext interface: public interface TransactionContext { public void commitTransaction(); public void commitTransactionOn(SourceRecord record); public void abortTransaction(); public void abortTransactionOn(SourceRecord record); } There would be no API for defining when a transaction begins, since we can assume that transactions begin when the first record is provided and immediately after every committed or aborted transaction. The method variants that accept SourceRecord instances would allow connectors to completely decouple record batches from transaction boundaries; they could produce single record batches with multiple transactions inside them, or define transactions that start in the middle of one batch and end in the middle of another. Tasks can be made aware of when transactions are completed (eith
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hey Chris! Nice work on this KIP! What are the thoughts about letting the connector developer control the boundaries of the transaction? For example kafka-connect-spooldir is used to parse and import files to kafka. It would be amazing if I could begin and end the transaction as I open and close files. This would allow me to guard from the dreaded exception a line 732 and wrap an entire file in a transaction. It either makes it or it doesn't. J On Tue, May 18, 2021 at 10:04 AM Chris Egerton wrote: > > Hi Randall, > > Thanks for clarifying the issues with large transactions. I think we're > starting to converge on an understanding of the problem space, if not quite > on an approach to tackle it. > > I think my biggest issue with defining transaction boundaries on a > per-task-poll basis are these points that you make: > > > First, it reuses something that connector implementations > already employ, and which many connectors have configs to tune the behavior > to maximize throughput. Second, it requires no additional configuration > properties or connector-specific behavior. Third, high-throughput > connectors are likely returning large batches, so write amplification for > the extra offset record per source partition is likely to have a much > smaller impact (assuming that most high throughput connectors have high > records-per-source-partition ratios). > > I don't believe the first and third points are safe assumptions to make. > There's actually very little, if any, performance benefit to writing source > connectors whose tasks give Connect larger batches (at least, not as far as > the framework logic goes). Every record is sequentially transformed, > converted, and dispatched to the producer, regardless of whether it came > from a batch of one or one million. So Connect does have the capability > right now to support high-throughput, small-batch connectors. > > For a concrete example, Confluent's Datagen connector ( > https://github.com/confluentinc/kafka-connect-datagen), which is used to > produce sample data for quickstarts and demos, performs just fine even > though it does absolutely no batching whatsoever and returns only a single > record per call to "SourceTask::Poll". In some not-super-rigorous > performance testing, I ran the connector three times against a local build > of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect > to perform a producer flush for every task-provided batch of records, then > ran the connector three times against that. Each test run produced exactly > one million records. The worst run out of the initial cases (against the > unmodified local 2.8.0 build) took 15 seconds to complete. The best run out > of the subsequent cases (against the modified local 2.8.0 build) took 2 > minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was > against a single local broker with no replication factor. > > Of course, if it's a matter of accommodating this one (demo-oriented) > connector, then it might still be worth it to put the onus on the > developers of that connector to modify it appropriately to work with > exactly-once support. But my point with this connector is more > general--simply put, there don't appear to be safe grounds for the > assumption that source tasks must produce large batches in order to achieve > high throughput. > > > Yes, per-connector offset commit intervals is one approach that would be > more explicit, though see my concerns earlier in this email about > controlling the number of records in a transaction by a time-based config, > even if set on a per-connector basis. > > I'll try to summarize the concerns presented; let me know if I'm missing > something: > > 1. Large transaction sizes can inflate the memory requirements for > consumers and possibly even overwhelm them with out-of-memory errors. > 2. High offset commit intervals can inflate the read latency of downstream > applications. > > Combining both of these concerns with the high-throughput-small-batch > scenario, it still seems worthwhile to provide users with a way to do some > multi-batch transactions for their source tasks. This is analogous to > consumer-side buffering; yes, by default you probably want events to be > available downstream as soon as possible, but in some cases it's still > necessary to introduce a small hit in latency in order to keep up with high > throughput. And if each batch is relatively small, then heightened memory > requirements for consumers shouldn't really be a problem. > > And for other scenarios, with per-connector offset commit intervals, users > can always just set the interval to zero to get exactly the behavior that > you're describing. > > > I'm not sure that setting up a separate Connect cluster is that practical > for many Connect users, especially in larger organizations where one group > manages the cluster and others manage connectors. > > I agree; I was mostly responding to what I perceived as the assumptio
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Randall, Thanks for clarifying the issues with large transactions. I think we're starting to converge on an understanding of the problem space, if not quite on an approach to tackle it. I think my biggest issue with defining transaction boundaries on a per-task-poll basis are these points that you make: > First, it reuses something that connector implementations already employ, and which many connectors have configs to tune the behavior to maximize throughput. Second, it requires no additional configuration properties or connector-specific behavior. Third, high-throughput connectors are likely returning large batches, so write amplification for the extra offset record per source partition is likely to have a much smaller impact (assuming that most high throughput connectors have high records-per-source-partition ratios). I don't believe the first and third points are safe assumptions to make. There's actually very little, if any, performance benefit to writing source connectors whose tasks give Connect larger batches (at least, not as far as the framework logic goes). Every record is sequentially transformed, converted, and dispatched to the producer, regardless of whether it came from a batch of one or one million. So Connect does have the capability right now to support high-throughput, small-batch connectors. For a concrete example, Confluent's Datagen connector ( https://github.com/confluentinc/kafka-connect-datagen), which is used to produce sample data for quickstarts and demos, performs just fine even though it does absolutely no batching whatsoever and returns only a single record per call to "SourceTask::Poll". In some not-super-rigorous performance testing, I ran the connector three times against a local build of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect to perform a producer flush for every task-provided batch of records, then ran the connector three times against that. Each test run produced exactly one million records. The worst run out of the initial cases (against the unmodified local 2.8.0 build) took 15 seconds to complete. The best run out of the subsequent cases (against the modified local 2.8.0 build) took 2 minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was against a single local broker with no replication factor. Of course, if it's a matter of accommodating this one (demo-oriented) connector, then it might still be worth it to put the onus on the developers of that connector to modify it appropriately to work with exactly-once support. But my point with this connector is more general--simply put, there don't appear to be safe grounds for the assumption that source tasks must produce large batches in order to achieve high throughput. > Yes, per-connector offset commit intervals is one approach that would be more explicit, though see my concerns earlier in this email about controlling the number of records in a transaction by a time-based config, even if set on a per-connector basis. I'll try to summarize the concerns presented; let me know if I'm missing something: 1. Large transaction sizes can inflate the memory requirements for consumers and possibly even overwhelm them with out-of-memory errors. 2. High offset commit intervals can inflate the read latency of downstream applications. Combining both of these concerns with the high-throughput-small-batch scenario, it still seems worthwhile to provide users with a way to do some multi-batch transactions for their source tasks. This is analogous to consumer-side buffering; yes, by default you probably want events to be available downstream as soon as possible, but in some cases it's still necessary to introduce a small hit in latency in order to keep up with high throughput. And if each batch is relatively small, then heightened memory requirements for consumers shouldn't really be a problem. And for other scenarios, with per-connector offset commit intervals, users can always just set the interval to zero to get exactly the behavior that you're describing. > I'm not sure that setting up a separate Connect cluster is that practical for many Connect users, especially in larger organizations where one group manages the cluster and others manage connectors. I agree; I was mostly responding to what I perceived as the assumption that exactly-once support could be configured on a per-connector basis, which hopefully we've clarified by now is not the case. > Reducing throughput is a technical solution, especially if the only alternative is that the task doesn't run. But IMO it's an impractical technical solution as most users will want a real-time solution, and decreasing connector's throughput is the opposite of this. Are you proposing that this option not be provided to users? I'm afraid if we don't do that then it'll just become an undocumented workaround that a handful of people are aware of (or, worse yet, have to discover for themselves), but most people aren't, even
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
On Mon, May 10, 2021 at 10:26 AM Chris Egerton wrote: > RE transaction boundaries: > > > First, the offset commit interval is currently 60 seconds by default, and > source connectors can produce a lot of records in that time. Until someone > operating a Connect cluster changed that to a much lower value, it's > possible that source connector performance could be significantly impacted. > ... > > At first blush, > controlling transaction boundaries based upon batches seems like it would > work better for high throughput connectors. > > Sorry, what's the concern about performance here? Write throughput for a > transactional producer doesn't suffer as the number of records per > transaction increases. I'm also worried about source connectors that return > small batches with high frequency; those would not fare well with a lower > offset commit interval. > Recall that with Kafka transactions, a consumer in read_committed mode buffers messages that are part of a transaction until that transaction is committed or aborted. When a consumer sees an aborted transaction marker for that transaction, the consumer discards all buffered messages associated with that transaction. When a consumer sees a commit transaction marker, it then forwards those buffered messages that are associated with that transaction to its application. Note the transaction markers and buffered messages are per topic partition. So I have two concerns with using time-based transaction boundaries. The first is easier to understand: consumers using read_committed mode may consume a lot of memory while buffering records that are part of active transactions. In fact, the memory required is a function of the number of records in the transaction in the assigned topic partitions. However, the only way a Connect user can control the number of records in each transaction is by reducing the offset commit interval used for _all connectors_. This means that the Connect user cannot directly limit the size of the transaction (and therefore the buffering memory required by the consumers) but can only limit the maximum duration of the transactions. This seems challenging at best, because the size of the transaction is a function of the throughput and the offset commit interval. My second concern is that applications consuming the topics to which a source connector writes will perceive those writes as having a potentially very high lag w/r/t the connector first seeing in the source system the information for those records. The best case is that the source connector discovers some information, generates a source record and hands it to Connect, and Connect writes that record to the topic just before the offset commit window closes and commits the transaction. A worse case is that the source connector gives the record to Connect just after the offset commit window closes (and the transaction is closed), and the transaction with that record will not be committed for another commit interval. This means the worst case *perceived lag* could be at least the offset commit interval. So if we agree that it will be difficult for some Connect users to choose a one-size-fits-all offset commit interval to work well for source connectors with a range of throughputs and different consumer application requirements, we may need to consider the ability for each connector to control the transaction boundaries, albeit still somewhat indirectly. The question then becomes how to specify the boundaries. Record counts alone are insufficient, since a transaction could be 1 record short for some time, resulting in the entire transaction timing out. Time/duration is also not sufficient, since for low perceived lag this should be set as small as is feasible (and thus poor for high throughput). Using a separate transaction for each batch seems like a very worthwhile compromise. First, it reuses something that connector implementations already employ, and which many connectors have configs to tune the behavior to maximize throughput. Second, it requires no additional configuration properties or connector-specific behavior. Third, high-throughput connectors are likely returning large batches, so write amplification for the extra offset record per source partition is likely to have a much smaller impact (assuming that most high throughput connectors have high records-per-source-partition ratios). Likewise, low-throughput connectors will either have very small or infrequent batches that will easily handle the higher write amplification (up to 2x: one offset record for every record). Regardless of the throughput, infrequent batches would have higher lag anyway even without EOS, and EOS transactions coupled to those batches would add minimum overhead. > > > It's true we already have > that constraint now, but transactions and more importantly transaction size > and latency add a completely different aspect to the calculation of how to > set the offset commit interval for a Connect cluster.
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Tom, Really interesting turn this has taken! Responses inline. > I'm not quite sure I follow what you mean here. Can you explain? AFAICT it doesn't apply in a cluster where all tasks are using fencible producers, but maybe I'm missing something. Imagine this (unlikely but possible) scenario: 1. A connector is created with N tasks and uses the fencible-but-not-transactional producer 2. Task N (as in, the task with the highest ID) is allocated to a worker that then becomes a zombie 3. The connector is reconfigured to use N-1 tasks, and this takes effect on all non-zombie workers in the cluster 4. The connector is reconfigured to enable full-on exactly-once support (i.e., use of the transactional producer) At this point, we would need to know to fence out task N that's running on the zombie worker. This is what is accomplished in the current design with the task count records in the config topic; even if the number of tasks in a connector is decreased, the leader would be aware of the old, higher task count for that connector, and know to fence out that many tasks. I was only noting this for completeness' sake; there's nothing about this requirement that renders your proposal impossible or even significantly more difficult. We'd just have to make sure to do the task count record bookkeeping for connectors regardless of whether they're exactly-once or not, so that if a connector has exactly-once switched on without a cluster roll in the middle, we'd know exactly how many tasks to fence out before bringing up that first round of transactional producers. > That will be the case for the new transactional cluster anyway. That's true, but we do go from three static ACLs (write/describe on a fixed transactional ID, and idempotent write on a fixed cluster) to a dynamic collection of ACLs. In especially large organizations where the people that administrate Connect clusters aren't necessarily the same as the people that create and manage connectors this might cause some friction. Still, since there are benefits to all users (regardless of requirements for exactly-once delivery guarantees) in the form of fencible producers that would, in many if not all circumstances, reduce duplicate writes, it's not out of the question to argue for this change. I also toyed with the question of "If we're going to require these new ACLs unconditionally, what's stopping us from just enabling fully-fledged exactly-once source support by default?". It'd be pretty straightforward to include zombie fencing for free with this change, for example. The only remaining blocker seems to be that the connector needs direct write and read access to the offsets topic that it uses. Ultimately I think it may behoove us to err on the side of reducing the breaking changes here for now and saving them for 4.0 (or some later major release), but would be interested in thoughts from you and others. > Gouzhang also has a (possible) use case for a fencing-only producer ( https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out there, you should be able to get these semantics today by calling initTransactions() and then just using the producer as normal (no beginTransaction()/abortTransaction()/endTransaction()). I tested this locally and was not met with success; transactional producers do a check right now to ensure that any calls to "KafkaProducer::send" occur within a transaction (see https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959 and https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451). Not a blocker, just noting that we'd have to do some legwork to make this workable with the producer API. > In light of that (and assuming you buy these arguments), I wonder how much extra effort it would be to do for EOS-enabled clusters as part of this KIP? The extra effort wouldn't be negligible (expansion of the producer API, more complexity in task count record logic, more thorough upgrade notes), but ultimately I wouldn't object to the proposal because of the extra work involved. What it really comes down to IMO is how aggressive we're willing to be with the breaking changes we make for users. If a good argument can be made for introducing new ACL requirements for every single connector running on 3.0 and beyond, then I'd be happy to fold this into the KIP in exchange for the ability to configure exactly-once support on per-connector basis. Really enjoying the fresh perspective you're bringing here, especially with regards to the transactional producer internals and Kafka Streams use cases! Cheers, Chris On Fri, May 14, 2021 at 10:07 AM Tom Bentley wrote: > Hi Chris, > > Thanks for the reply. > > "required"/"requested" sounds good to me. Likewise the pre-flight check and > "PUT /{connectorTy
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Chris, Thanks for the reply. "required"/"requested" sounds good to me. Likewise the pre-flight check and "PUT /{connectorType}/config/validate". The other half is we'd still need to > track the number of tasks for that connector that would need to be fenced > out if/when exactly-once for it were switched on. > I'm not quite sure I follow what you mean here. Can you explain? AFAICT it doesn't apply in a cluster where all tasks are using fencible producers, but maybe I'm missing something. If we had the > intermediate producer you describe at our disposal, and it were in use by > every running source task for a given connector, we could probably enable > users to toggle exactly-once on a per-connector basis, but it would also > require new ACLs for all connectors. > That will be the case for the new transactional cluster anyway. I think there is value to supporting connectors that don't use full-blown transactions in an exactly-once cluster, because the overhead in a fencing producer should be similar to an idempotent producer (which IIRC is about 3% above a non-idempotent producer). That's because we only need to make a single InitProducerIdRequest, and thereafter the epoch check is tiny. If that's right then many people would then be able to use a single cluster for both exactly once and non-exactly once connectors (i.e. it would get rid of the performance cost of running a non-EOS connector in an exactly-once cluster). Only people who cared about the ~3% would need to run "old-style" clusters using unfenced producers. Gouzhang also has a (possible) use case for a fencing-only producer ( https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out there, you should be able to get these semantics today by calling initTransactions() and then just using the producer as normal (no beginTransaction()/abortTransaction()/endTransaction()). In light of that (and assuming you buy these arguments), I wonder how much extra effort it would be to do for EOS-enabled clusters as part of this KIP? Thanks again, Tom On Fri, May 14, 2021 at 2:14 AM Chris Egerton wrote: > Hi Tom, > > I'm fine with an implicit mapping of connector-provided null to > user-exposed UNKNOWN, if the design continues down that overall path. > > Allowing users to assert that a connector should support exactly-once > sounds reasonable; it's similar to the pre-flight checks we already do for > connector configurations such as invoking "Connector::validate" and > ensuring that all of the referenced SMTs, Predicates, and Converter classes > are present on the worker. In fact, I wonder if that's how we could > implement it--as a preflight check. That way, Connector and Task instances > won't even have the chance to fail; if the user states a requirement for > exactly-once support but their connector configuration doesn't meet that > requirement, we can fail the connector creation/reconfiguration request > before even writing the new config to the config topic. We could also add > this support to the "PUT /{connectorType}/config/validate" endpoint so that > users could test exactly-once support for various configurations without > having to actually create or reconfigure a connector. We could still fail > tasks on startup if something slipped by (possibly due to connector > upgrade) but it'd make the UX a bit smoother in most cases to fail faster. > > Since a possible use of the property is to allow future users to control > exactly-once support on a per-connector basis, I wonder whether a binary > property is sufficient here. Even if a connector doesn't support > exactly-once, there could still be benefits to using a transactional > producer with rounds of zombie fencing; for example, preventing duplicate > task instances from producing data, which could be leveraged to provide > at-most-once delivery guarantees. In that case, we'd want a way to signal > to Connect that the framework should do everything it does to provide > exactly-once source support, but not make the assertion on the connector > config, and we'd end up providing three possibilities to users: required, > best-effort, and disabled. It sounds like right now what we're proposing is > that we expose only the first two and don't allow users to actually disable > exactly-once support on a per-connector basis, but want to leave room for > the third option in the future. With that in mind, "required/not_required" > might not be the best fit. Perhaps "required"/"requested" for now, with > "disabled" as the value that could be implemented later? > > RE: "Is the problem here simply that the zombie fencing provided by the > producer is only available when using transactions, and therefore having a > non-transactional producer in the cluster poses a risk of a zombie not > being fenced?"--that's half of it. The other half is we'd still need to > track the number of tasks for that connector that would need to be fenced > out if/when exactly-once for it were switched on.
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Tom, I'm fine with an implicit mapping of connector-provided null to user-exposed UNKNOWN, if the design continues down that overall path. Allowing users to assert that a connector should support exactly-once sounds reasonable; it's similar to the pre-flight checks we already do for connector configurations such as invoking "Connector::validate" and ensuring that all of the referenced SMTs, Predicates, and Converter classes are present on the worker. In fact, I wonder if that's how we could implement it--as a preflight check. That way, Connector and Task instances won't even have the chance to fail; if the user states a requirement for exactly-once support but their connector configuration doesn't meet that requirement, we can fail the connector creation/reconfiguration request before even writing the new config to the config topic. We could also add this support to the "PUT /{connectorType}/config/validate" endpoint so that users could test exactly-once support for various configurations without having to actually create or reconfigure a connector. We could still fail tasks on startup if something slipped by (possibly due to connector upgrade) but it'd make the UX a bit smoother in most cases to fail faster. Since a possible use of the property is to allow future users to control exactly-once support on a per-connector basis, I wonder whether a binary property is sufficient here. Even if a connector doesn't support exactly-once, there could still be benefits to using a transactional producer with rounds of zombie fencing; for example, preventing duplicate task instances from producing data, which could be leveraged to provide at-most-once delivery guarantees. In that case, we'd want a way to signal to Connect that the framework should do everything it does to provide exactly-once source support, but not make the assertion on the connector config, and we'd end up providing three possibilities to users: required, best-effort, and disabled. It sounds like right now what we're proposing is that we expose only the first two and don't allow users to actually disable exactly-once support on a per-connector basis, but want to leave room for the third option in the future. With that in mind, "required/not_required" might not be the best fit. Perhaps "required"/"requested" for now, with "disabled" as the value that could be implemented later? RE: "Is the problem here simply that the zombie fencing provided by the producer is only available when using transactions, and therefore having a non-transactional producer in the cluster poses a risk of a zombie not being fenced?"--that's half of it. The other half is we'd still need to track the number of tasks for that connector that would need to be fenced out if/when exactly-once for it were switched on. If we had the intermediate producer you describe at our disposal, and it were in use by every running source task for a given connector, we could probably enable users to toggle exactly-once on a per-connector basis, but it would also require new ACLs for all connectors. Even though we're allowed to make breaking changes with the upcoming 3.0 release, I'm not sure the tradeoff is worth it. I suppose we could break down exactly-once support into two separate config properties--a worker-level property, that causes all source tasks on the worker to use producers that can be fenced (either full-on transactional producers or "intermediate" producers), and a per-connector property, that toggles whether the connector itself uses a full-on transactional producer or just an intermediate producer (and whether or not zombie fencing is performed for new task configs). This seems like it might be overkill for now, though. As far as the zombie fencing endpoint goes--the behavior will be the same either way w/r/t the exactly.once.source.enabled property. The property will dictate whether the endpoint is used by tasks, but it'll be available for use no matter what. This is how a rolling upgrade becomes possible; even if the leader hasn't been upgraded yet (to set exactly.once.source.enabled to true), it will still be capable of handling fencing requests from workers that have already been upgraded. Cheers, Chris On Wed, May 12, 2021 at 5:33 AM Tom Bentley wrote: > Hi Chris and Randall, > > I can see that for connectors where exactly once is configuration-dependent > it makes sense to use a default method. The problem with having an explicit > UNKNOWN case is we really want connector developers to _not_ use it. That > could mean it's deprecated from the start. Alternatively we could omit it > from the enum and use null to mean unknown (we'd have to check for a null > result anyway), with the contract for the method being that it should > return non-null. Of course, this doesn't remove the ambiguous case, but > avoids the need to eventually remove UNKNOWN in the future. > > I think there's another way for a worker to use the value too: Imagine > you're deploying a connector t
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Chris and Randall, I can see that for connectors where exactly once is configuration-dependent it makes sense to use a default method. The problem with having an explicit UNKNOWN case is we really want connector developers to _not_ use it. That could mean it's deprecated from the start. Alternatively we could omit it from the enum and use null to mean unknown (we'd have to check for a null result anyway), with the contract for the method being that it should return non-null. Of course, this doesn't remove the ambiguous case, but avoids the need to eventually remove UNKNOWN in the future. I think there's another way for a worker to use the value too: Imagine you're deploying a connector that you need to be exactly once. It's awkward to have to query the REST API to determine that exactly once was working, especially if you need to do this after config changes too. What you actually want is to make an EOS assertion, via a connector config (e.g. require.exactly.once=true, or perhaps exactly.once=required/not_required), which would fail the connector/task if exactly once could not be provided. The not_required case wouldn't disable the transactional runtime environment, simply not guarantee that it was providing EOS. Although it would leave the door open to supporting mixed EOS/non-transactional deployments in the cluster in the future, if that became possible (i.e. we could retrospectively make not_required mean no transactions). On the subject of why it's not possible to enabled exactly once on a per-connector basis: Is the problem here simply that the zombie fencing provided by the producer is only available when using transactions, and therefore having a non-transactional producer in the cluster poses a risk of a zombie not being fenced? This makes me wonder whether there's a case for a producer with zombie fencing that is not transactional (intermediate between idempotent and transactional producer). IIUC this would need to make a InitProducerId request and use the PID in produce requests, but could dispense with the other transactional RPCs. If such a thing existed would the zombie fencing it provided be sufficient to provide safe semantics for running a non-EOS connector in an EOS-capable cluster? The endpoint for zombie fencing: It's not described how this works when exactly.once.source.enabled=false Cheers, Tom
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Thanks for your thoughts, Randall. Responses below: RE exactly-once support in the REST API: > The worker has to do (or plan to do) something with the information about a connector's support for EOS, whether that's via an annotation or a method. Otherwise, what's the point of requiring the connector to expose this information. But the problem I see with this whole concept is that there will still be ambiguity. For example, if the method returns `UNKNWON` by default, the connector could still be written in a way where EOS does work with the connector. Yet what are users supposed to do when they see "UNKNOWN"? I think the idea here is that it'd be nice to have a simple, common interface for connector developers to expose this information to users. The natural alternative is to just add a note to the documentation for that specific connector, but then we've lost the "common" part of that, and possibly the "simple" (depending on how searchable the docs for the connector are and how complex the logic of exactly-once support is). For some sink connectors out there exactly-once support is complex enough to warrant things like entire pages of documentation and even flowchart diagrams, and as that complexity rises, user experience gets degraded. As far as how to handle "UNKNOWN" goes--users can go the usual routes that they already can for sink connectors: search documentation and reach out to connector developers. However, once the question of exactly-once support comes to these connector developers, they will also likely become aware of the new API for surfacing this information to users, and can take advantage of it to save themselves the trouble of having to personally answer these questions in the future. Any connector developers who are already aware of exactly-once support for source connectors coming to Connect can leverage the new API either instead of or in addition to expanding docs for their connectors. This can be thought of almost like a non-expiring caching layer for documentation; yes, there are going to be some cache misses (i.e., "UNKNOWN"), but whenever one of those misses occurs, there's now an option to cache the requested information in a more convenient, standard location for future users. RE transaction boundaries: > First, the offset commit interval is currently 60 seconds by default, and source connectors can produce a lot of records in that time. Until someone operating a Connect cluster changed that to a much lower value, it's possible that source connector performance could be significantly impacted. ... > At first blush, controlling transaction boundaries based upon batches seems like it would work better for high throughput connectors. Sorry, what's the concern about performance here? Write throughput for a transactional producer doesn't suffer as the number of records per transaction increases. I'm also worried about source connectors that return small batches with high frequency; those would not fare well with a lower offset commit interval. > It's true we already have that constraint now, but transactions and more importantly transaction size and latency add a completely different aspect to the calculation of how to set the offset commit interval for a Connect cluster. ... > If we're committing transactions every 60 seconds (or even as frequently as every 5 seconds), then the _perceived lag_ will be significantly higher with transactions than without. That's a fair point. High-throughput connectors are likely to benefit from higher commit intervals so that they have to pause to commit offsets, and produce offset records, less frequently. Low-latency connectors are likely to benefit from lower commit intervals in order to shorten the time between source records being produced to Kafka and their transactions being committed. There's no reason to assume that there's a one-size-fits-all offset commit interval for an entire cluster once offset commit becomes a requirement for records to be visible to downstream consumers. > Have you considered having the worker create a separate transaction for each batch of records returned by the connector? I had not considered this, but for reasons outlined above regarding performance with high-throughput connectors, I'm hesitant to put it into play by default. Given the conflicting requirements of high-throughput and low-latency connectors with regards to offset commit, I do agree that for some connectors, the most useful default behavior would be to commit offsets as soon as possible, and committing once for every batch returned from "SourceTask::poll" is a great way to allow that. The most straightforward way to accommodate these conflicting cases seems to be to just go ahead and allow per-connector offset commit intervals. That way, the user gets to define an approximate upper bound on the time between a record being produced and its transaction being committed--and if they want the upper bound to be zero, they can just configure
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Thanks once again for the KIP and the updates, Chris. As mentioned in my prior email, here are my more general comments/questions (other than my earlier question about the transaction boundaries). Hopefully these are readable: The worker-level offset.flush.timeout.ms property will be ignored for > exactly-once source tasks. They will be allowed to take as long as > necessary to complete an offset commit, since the cost of failure at that > point is to fail the source task. Currently, all source task offset commits > take place on a single shared worker-global thread. In order to support > source task commits without a timeout, but also prevent laggy tasks from > disrupting the availability of other tasks on the cluster, the worker will > be modified to permit simultaneous source task offset commits. This seems to contradict the earlier quote. Specifically, the first quote above states that transaction boundaries are dictated by offset flushes, which is controlled by the `offset.flush.interval.ms` property. However, the second quote says the `offset.flush.timeout.ms` property will be ignored for EOS source tasks. I must be missing something. It may take longer than the transaction timeout for a task to flush all of > its records to Kafka. In this case, there are two actions that users can > take to nurse their connector back to health: reconfigure it to produce > records with a lower throughput, or increase the transaction timeout for > the producers used by the connector. Does it seem realistic or practical to suggest to reduce the source connector's throughput? Finally, it allows users to limit the effect that hanging transactions on > an offsets topic will have. If tasks A and B use the same offsets topic, > and task A initiates a transaction on that offsets topic right before task > B starts up, then task A dies suddenly without committing its transaction, > task B will have to wait for that transaction to time out before it can > read to the end of the offsets topic. If the transaction timeout is set > very high for task A (to accommodate bursts of high throughput, for > example), this will block task B from processing any data for a long time. > Although this scenario may be unavoidable in some cases, using a dedicated > offsets topic for each connector should allow cluster administrators to > isolate the blast radius of a hanging transaction on an offsets topic. This > way, although tasks of the same connector may still interfere with each > other, they will at least not interfere with tasks of other connectors. > This should be sufficient for most multitenant environments. Do you mean to use "task A" and "task B" here? Do they imply tasks from the same connector? If so, then won't the offsets from both of those tasks still be written to the same connector-specific offset topic, and suffer from the potential blocking issue mentioned above? While it's useful to call this out, I'm not sure how that example helps support the motivation for a separate per-connector offset topic. OTOH, if these were tasks from _different_ connectors, then it becomes clear that the offsets from one source connector using a connector-specific offsets topic will never block the offsets from another connector using a different connector-specific offsets topic. Thus, having each connector use separate connector-specific offset topics at least avoids the problem of one connector's tasks blocking the tasks of the other connector just because of transaction commit timeout issues. Regardless of whether exactly.once.source.enabled is set to true for the > worker, if a connector configuration contains a value for the > offsets.storage.topic property, it will use an offsets topic with that > name on the Kafka cluster that it produces data to (which may be different > from the one that hosts the worker's global offsets topic). It's implied but not explicitly stated that this will result in duplicating the offsets for EOS source connectors: the workers will continue to track all source offsets in its own offsets topic, and EOS-enabled source connectors will also track their own offsets in connector-specific offsets topics. It might be worth making that more obvious, and which will be used upon connector restarts. If a connector is explicitly or implicitly configured to use a separate > offsets topic but that topic does not exist yet, task and connector > instances will automatically try to create the topic before startup. Are you suggesting that the source connector and task implementations be responsible for this? Or are you suggesting that the worker will be responsible for creating the offsets topics on behalf of source connector and task implementations (using an admin client per the configurations), just like the worker currently does so for the cluster's offsets topic? Please clarify this in the KIP. If a worker is active and does not have support for exactly-once delivery... Do you think it's worthwhile add
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Thanks again for the continued improvements to this KIP, Chris. I have a number of questions that I'll enumerate in a subsequent email, but first I wanted to make a higher-level comment. I don't know what to make about the proposal to rely upon the existing offset commit interval as the way to control the transaction boundaries. Specifically, the KIP says: > The timing for offset commits will remain the same as it is today; they > will be triggered periodically on a fixed interval that users can adjust > via the offset.flush.interval.ms property. Transaction boundaries and > record batches will be defined by these offset commits; expanded support > for transaction boundary definition can be added later and is noted in the > "future work" section. > It's not clear to me why we're coupling the transaction boundaries to the existing offset flush behavior. (BTW, I didn't see a rejected alternative that addresses this. There is one about "Connector-defined transaction boundaries", but I'm not suggesting that we let connectors explicitly control the transaction boundaries.) First, the offset commit interval is currently 60 seconds by default, and source connectors can produce a lot of records in that time. Until someone operating a Connect cluster changed that to a much lower value, it's possible that source connector performance could be significantly impacted. Second, there is only one worker-level setting for the offset commit interval, which means that this one setting would have to be used for source connectors with high and low throughput. It's true we already have that constraint now, but transactions and more importantly transaction size and latency add a completely different aspect to the calculation of how to set the offset commit interval for a Connect cluster. Third and perhaps most importantly, a record produced by a source connector and written to the Kafka topic outside of a transaction is visible to a consumer (worst case) immediately after that record has been replicated to all of the in-sync replicas. However, if the producer writes that record in a transaction, then that record will be visible to a consumer (using `isolation.level=read_committed`) only after that transaction is committed. If we're committing transactions every 60 seconds (or even as frequently as every 5 seconds), then the _perceived lag_ will be significantly higher with transactions than without. And that seems to go against the last two goals of the KIP: "Minimize gotchas and potential footguns" and "Overall, make this a feature that gives people joy to use, not pain". Have you considered having the worker create a separate transaction for each batch of records returned by the connector? Yes we'd still want to add the appropriate offset topic record(s) at the end of each transaction, but this seems to align much more closely with the existing mechanisms connector developers have and any existing configuration tuning options connector developers and their users already have. And the existing worker-level source offsets could still be flushed with the current behavior, since the worker-level (non-transactional) offsets don't need to be flushed at the same time as the transaction boundaries. At first blush, controlling transaction boundaries based upon batches seems like it would work better for high throughput connectors. It is true that low-throughput connectors would result in higher write amplification (worst case being 2x when each batch returns a single record), but for low-throughput connectors this seems like this might be an acceptable tradeoff if EOS is really needed, or if not then EOS can be disabled for this connector. This would add an implicit mapping between the batches and transactions, but I still think we'd want to eventually allow in the future a source connector to _explicitly_ control the transaction boundaries within a single batch. Thanks, and best regards. Randall On Tue, May 4, 2021 at 4:11 PM Chris Egerton wrote: > Hi all, > > Good news everyone! I've reworked the design one more time and hopefully > some of the improvements here should make the proposal more palatable. > TL;DR: > > - Rolling upgrades are now possible, in at most two phases; workers will > first have to be given a binary upgrade to 3.0 (the targeted version for > this feature) which can be a rolling upgrade, and then a rolling upgrade to > enable exactly-once source support in a cluster should be possible with no > anticipated downtime for source connectors or their tasks > - Offset topic migration is completely removed in favor of fallback to the > global offsets topic (much simpler!) > - One backwards-incompatible change is introduced: the leader will be > required to use a transactional producer for writes to the config topic > regardless of whether exactly-once support is enabled on the cluster. > Technically we could gate this behind a config property but since the > benefits of a transactional producer actually exte
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Thanks for continuing to work on this KIP, Chris. On Fri, May 7, 2021 at 12:58 PM Chris Egerton wrote: > b) An annotation is a really cool way to allow connector developers to > signal eligibility for exactly-once to Connect (and possibly, through > Connect, to users). Like you mention, connectors both with and without the > annotation could still run on both pre- and post-upgrade workers with no > worries about missing class errors. And it only requires a single-line > change to each connector. My only concern is that with some connectors, > exactly-once support might not be all-or-nothing and might be dependent on > how the connector is configured. For a practical example, Confluent's JDBC > source connector would likely be eligible for exactly-once when run in > incrementing mode (where it tracks offsets based on the value of a > monotonically-increasing table column), but not in bulk mode (where it > doesn't provide offsets for its records at all). With that in mind, what do > you think about a new "exactlyOnce()" method to the SourceConnector class > that can return a new ExactlyOnce enum with options of "SUPPORTED", > "UNSUPPORTED", and "UNKNOWN", with a default implementation that returns > "UNKNOWN"? This can be invoked by Connect after start() has been called to > give the connector a chance to choose its response based on its > configuration. > As far as what to do with this information goes--I think it'd go pretty > nicely in the response from the GET /connectors/{connector} endpoint, which > currently includes information about the connector's name, configuration, > and task IDs. We could store the info in the config topic in the same > record that contains the connector's configuration whenever a connector is > (re)configured, which would guarantee that the information provided about > eligibility for exactly-once matches the configuration it was derived from, > and would present no compatibility issues (older workers would be able to > read records written by new workers and vice-versa). Thoughts? > The worker has to do (or plan to do) something with the information about a connector's support for EOS, whether that's via an annotation or a method. Otherwise, what's the point of requiring the connector to expose this information. But the problem I see with this whole concept is that there will still be ambiguity. For example, if the method returns `UNKNWON` by default, the connector could still be written in a way where EOS does work with the connector. Yet what are users supposed to do when they see "UNKNOWN"?
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Tom, Thanks for taking a look! Really appreciate it. Answers below: 1. a) This is possible, but it would likely require reworking the rebalance protocol, and would not provide comprehensive guarantees about exactly-once delivery. If the group coordination protocol used for rebalancing is also leveraged to determine whether every worker has exactly-once source support enabled, it will by definition fail to provide information about zombie workers that are up and actively processing data, but have become disconnected from the group coordinator. Given the additional effort required and the potential for false positives (which could lead to a pretty bad user experience and undermine trust in the feature) I'm not sure it'd be worth it to try to add that kind of automatic detection, but I'd be interested in your thoughts, especially if there's an easier way to get what we want here. b) An annotation is a really cool way to allow connector developers to signal eligibility for exactly-once to Connect (and possibly, through Connect, to users). Like you mention, connectors both with and without the annotation could still run on both pre- and post-upgrade workers with no worries about missing class errors. And it only requires a single-line change to each connector. My only concern is that with some connectors, exactly-once support might not be all-or-nothing and might be dependent on how the connector is configured. For a practical example, Confluent's JDBC source connector would likely be eligible for exactly-once when run in incrementing mode (where it tracks offsets based on the value of a monotonically-increasing table column), but not in bulk mode (where it doesn't provide offsets for its records at all). With that in mind, what do you think about a new "exactlyOnce()" method to the SourceConnector class that can return a new ExactlyOnce enum with options of "SUPPORTED", "UNSUPPORTED", and "UNKNOWN", with a default implementation that returns "UNKNOWN"? This can be invoked by Connect after start() has been called to give the connector a chance to choose its response based on its configuration. As far as what to do with this information goes--I think it'd go pretty nicely in the response from the GET /connectors/{connector} endpoint, which currently includes information about the connector's name, configuration, and task IDs. We could store the info in the config topic in the same record that contains the connector's configuration whenever a connector is (re)configured, which would guarantee that the information provided about eligibility for exactly-once matches the configuration it was derived from, and would present no compatibility issues (older workers would be able to read records written by new workers and vice-versa). Thoughts? 2. You're correct; because there is no blocking, we have no guarantees that the global offsets topic is ever actually populated, and if there are issues with populating it and then a hard downgrade is necessary, a flood of duplicates could result. My thinking here was that we'd already be taking a hit in performance by introducing transactions and switching from an asynchronous offset commit model to a synchronous one; I didn't want to slow things down any further by having to write to not one, but two different offsets topics. 3. Added to the KIP but reiterating here: the authorizations required for `Admin::fenceProducers` will be the same as the ones required to use a transactional producer; specifically, grants for the Write and Describe operations on all of the TransactionalId resources by the user, and a grant for the IdempotentWrite operation on the Cluster resource. 4. Added to the KIP but reiterating here: the name of the topic will be the value of the "offsets.storage.topic" property in the worker config. Cheers, and thanks again for the review! Chris On Thu, May 6, 2021 at 12:14 PM Tom Bentley wrote: > Hi Chris, > > Thanks for this KIP. I've taken an initial look and have a few questions. > > 1. The doc for exactly.once.source.enabled says "Note that this must be > enabled on every worker in a cluster in order for exactly-once delivery to > be guaranteed, and that some source connectors may still not be able to > provide exactly-once delivery guarantees even with this support enabled." > a) Could we detect when only some workers in the cluster had support > enabled, and make it apparent that exactly-once wasn't guaranteed? > b) I'm wondering how users will be able to reason about when a connector is > really giving them exactly-once. I think this is defined in the limitations > section much later on, right? It seems to require somewhat detailed > knowledge of how the connector is implemented. And right now there's no > standard way for a connector author to advertise their connector as being > compatible. I wonder if we could tackle this in a compatible way using an > annotation on the Connector class. That shouldn't cause problems for older > versions of Co
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Chris, Thanks for this KIP. I've taken an initial look and have a few questions. 1. The doc for exactly.once.source.enabled says "Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed, and that some source connectors may still not be able to provide exactly-once delivery guarantees even with this support enabled." a) Could we detect when only some workers in the cluster had support enabled, and make it apparent that exactly-once wasn't guaranteed? b) I'm wondering how users will be able to reason about when a connector is really giving them exactly-once. I think this is defined in the limitations section much later on, right? It seems to require somewhat detailed knowledge of how the connector is implemented. And right now there's no standard way for a connector author to advertise their connector as being compatible. I wonder if we could tackle this in a compatible way using an annotation on the Connector class. That shouldn't cause problems for older versions of Connect from running connectors with the annotation (it's not an error for an annotation type to not be present at runtime), but would allow the support for exactly-once to be apparent and perhaps even exposed through the connector status REST endpoint. It completely relies on the connector author honouring the contact, of course, but it doesn't have the compatibility problems of using a marker interface, for example. 2. About the post-transaction offset commit: " This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all." If there's no blocking then how can we be sure that the write to the global offsets topic ever actually happens? If it never happens then presumably in case of a hard downgrade we could see arbitrarily many duplicates? I don't necessarily see this as a show-stopper, more I'm trying to understand what's possible with this design. 3. What authorization is needed for Admin.fenceProducers()? 4. Maybe I missed it, but when a per-connector offsets storage topic is created implicitly what will it be called? Cheers, Tom On Tue, May 4, 2021 at 10:26 PM Chris Egerton wrote: > Hi all, > > Good news everyone! I've reworked the design one more time and hopefully > some of the improvements here should make the proposal more palatable. > TL;DR: > > - Rolling upgrades are now possible, in at most two phases; workers will > first have to be given a binary upgrade to 3.0 (the targeted version for > this feature) which can be a rolling upgrade, and then a rolling upgrade to > enable exactly-once source support in a cluster should be possible with no > anticipated downtime for source connectors or their tasks > - Offset topic migration is completely removed in favor of fallback to the > global offsets topic (much simpler!) > - One backwards-incompatible change is introduced: the leader will be > required to use a transactional producer for writes to the config topic > regardless of whether exactly-once support is enabled on the cluster. > Technically we could gate this behind a config property but since the > benefits of a transactional producer actually extend beyond exactly-once > source support (we can now ensure that there's only one writer to the > config topic at any given time, which isn't guaranteed with the current > model) and the cost to accommodate it is fairly low (a handful of > well-defined and limited-scope ACLs), I erred on the side of keeping things > simple > > Looking forward to the next round of review and really hoping we can get > the ball rolling in time for this to land with 3.0! > > Cheers, > > Chris > > On Mon, Apr 12, 2021 at 7:51 AM Chris Egerton wrote: > > > Hi Randall, > > > > After thinking things over carefully, I've done some reworking of the > > design. Instead of performing zombie fencing during rebalance, the leader > > will expose an internal REST endpoint that will allow workers to request > a > > round of zombie fencing on demand, at any time. Workers will then hit > this > > endpoint after starting connectors and after task config updates for > > connectors are detected; the precise details of this are outlined in the > > KIP. If a round of fencing should fail for any reason, the worker will be > > able to mark its Connector failed and, if the user wants to retry, they > can > > simply restart the Connector via the REST API (specifically, the POST > > /connectors/{connector}/restart endpoint). > > > > The idea I'd been playing with to allow workers to directly write to the > > config topic seemed promising at first, but it allowed things to get > pretty > > hairy for users if any kind of rebalancing bug took place and two workers > > believed they owned the same Connector object. > > > > I hope this answers any outstanding questions and look forward to your > > thoughts. > > > > Cheers, > > > > Chris > > > > On Mon, Mar 22, 2021 at 4:38 PM Chris Eger
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi all, Good news everyone! I've reworked the design one more time and hopefully some of the improvements here should make the proposal more palatable. TL;DR: - Rolling upgrades are now possible, in at most two phases; workers will first have to be given a binary upgrade to 3.0 (the targeted version for this feature) which can be a rolling upgrade, and then a rolling upgrade to enable exactly-once source support in a cluster should be possible with no anticipated downtime for source connectors or their tasks - Offset topic migration is completely removed in favor of fallback to the global offsets topic (much simpler!) - One backwards-incompatible change is introduced: the leader will be required to use a transactional producer for writes to the config topic regardless of whether exactly-once support is enabled on the cluster. Technically we could gate this behind a config property but since the benefits of a transactional producer actually extend beyond exactly-once source support (we can now ensure that there's only one writer to the config topic at any given time, which isn't guaranteed with the current model) and the cost to accommodate it is fairly low (a handful of well-defined and limited-scope ACLs), I erred on the side of keeping things simple Looking forward to the next round of review and really hoping we can get the ball rolling in time for this to land with 3.0! Cheers, Chris On Mon, Apr 12, 2021 at 7:51 AM Chris Egerton wrote: > Hi Randall, > > After thinking things over carefully, I've done some reworking of the > design. Instead of performing zombie fencing during rebalance, the leader > will expose an internal REST endpoint that will allow workers to request a > round of zombie fencing on demand, at any time. Workers will then hit this > endpoint after starting connectors and after task config updates for > connectors are detected; the precise details of this are outlined in the > KIP. If a round of fencing should fail for any reason, the worker will be > able to mark its Connector failed and, if the user wants to retry, they can > simply restart the Connector via the REST API (specifically, the POST > /connectors/{connector}/restart endpoint). > > The idea I'd been playing with to allow workers to directly write to the > config topic seemed promising at first, but it allowed things to get pretty > hairy for users if any kind of rebalancing bug took place and two workers > believed they owned the same Connector object. > > I hope this answers any outstanding questions and look forward to your > thoughts. > > Cheers, > > Chris > > On Mon, Mar 22, 2021 at 4:38 PM Chris Egerton wrote: > >> Hi Randall, >> >> No complaints about email size from me. Let's dive in! >> >> 1. Sure! Especially important in my mind is that this is already possible >> with Connect as it is today, and users can benefit from this with or >> without the expanded exactly-once souce support we're trying to add with >> this KIP. I've added that info to the "Motivation" section and included a >> brief overview of the idempotent producer in the "Background and >> References" section. >> >> 2. I actually hadn't considered enabling exactly-once source support by >> default. Thinking over it now, I'm a little hesitant to do so just because, >> even with the best testing out there, it's a pretty large change and it >> seems safest to try to make it opt-in in case there's unanticipated >> fallout. Then again, when incremental cooperative rebalancing was >> introduced, it was made opt-out instead of opt-in. However, ICR came with >> lower known risk of breaking existing users' setups; we know for a fact >> that, if you haven't granted your worker or connector principals some ACLs >> on Kafka, your connectors will fail. In an ideal world people would >> carefully read upgrade notes and either grant those permissions or disable >> the feature before upgrading their Connect cluster to 3.0, but if they >> don't, they'll be in for a world of hurt. Overall I'd be more comfortable >> letting this feature incubate for a little bit to let everyone get familiar >> with it before possibly enabling it in 4.0 by default; what do you think? >> >> 3. I didn't think too long about the name for the offsets topic property; >> it seemed pretty unlikely that it'd conflict with existing connector >> property names. One alternative could be to follow the idiom established by >> KIP-458 and include the word "override" in there somewhere, but none of the >> resulting names seem very intuitive ("offsets.storage.override.topic" seems >> the best to me but even that isn't super clear). Happy to field suggestions >> if anyone has any alternatives they'd like to propose. >> >> 4. I _really_ wanted to enable per-connector toggling of this feature for >> the exact reasons you've outlined. There's already a couple cases where >> some piece of functionality was introduced that users could only control at >> the worker config level and, later, effort had to
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Randall, After thinking things over carefully, I've done some reworking of the design. Instead of performing zombie fencing during rebalance, the leader will expose an internal REST endpoint that will allow workers to request a round of zombie fencing on demand, at any time. Workers will then hit this endpoint after starting connectors and after task config updates for connectors are detected; the precise details of this are outlined in the KIP. If a round of fencing should fail for any reason, the worker will be able to mark its Connector failed and, if the user wants to retry, they can simply restart the Connector via the REST API (specifically, the POST /connectors/{connector}/restart endpoint). The idea I'd been playing with to allow workers to directly write to the config topic seemed promising at first, but it allowed things to get pretty hairy for users if any kind of rebalancing bug took place and two workers believed they owned the same Connector object. I hope this answers any outstanding questions and look forward to your thoughts. Cheers, Chris On Mon, Mar 22, 2021 at 4:38 PM Chris Egerton wrote: > Hi Randall, > > No complaints about email size from me. Let's dive in! > > 1. Sure! Especially important in my mind is that this is already possible > with Connect as it is today, and users can benefit from this with or > without the expanded exactly-once souce support we're trying to add with > this KIP. I've added that info to the "Motivation" section and included a > brief overview of the idempotent producer in the "Background and > References" section. > > 2. I actually hadn't considered enabling exactly-once source support by > default. Thinking over it now, I'm a little hesitant to do so just because, > even with the best testing out there, it's a pretty large change and it > seems safest to try to make it opt-in in case there's unanticipated > fallout. Then again, when incremental cooperative rebalancing was > introduced, it was made opt-out instead of opt-in. However, ICR came with > lower known risk of breaking existing users' setups; we know for a fact > that, if you haven't granted your worker or connector principals some ACLs > on Kafka, your connectors will fail. In an ideal world people would > carefully read upgrade notes and either grant those permissions or disable > the feature before upgrading their Connect cluster to 3.0, but if they > don't, they'll be in for a world of hurt. Overall I'd be more comfortable > letting this feature incubate for a little bit to let everyone get familiar > with it before possibly enabling it in 4.0 by default; what do you think? > > 3. I didn't think too long about the name for the offsets topic property; > it seemed pretty unlikely that it'd conflict with existing connector > property names. One alternative could be to follow the idiom established by > KIP-458 and include the word "override" in there somewhere, but none of the > resulting names seem very intuitive ("offsets.storage.override.topic" seems > the best to me but even that isn't super clear). Happy to field suggestions > if anyone has any alternatives they'd like to propose. > > 4. I _really_ wanted to enable per-connector toggling of this feature for > the exact reasons you've outlined. There's already a couple cases where > some piece of functionality was introduced that users could only control at > the worker config level and, later, effort had to be made in order to add > per-connector granularity: key/value converters and connector Kafka clients > are the two at the top of my head, and there may be others. So if at all > possible, it'd be great if we could support this. The only thing standing > in the way is that it allows exactly-once delivery to be compromised, which > in my estimation was unacceptable. I'm hoping we can make this feature > great enough that it'll work with most if not all source connectors out > there, and users won't even want to toggle it on a per-connector basis. > Otherwise, we'll have to decide between forcing users to split their > connectors across two Connect clusters (one with exactly-once source > enabled, one with it disabled), which would be awful, or potentially seeing > duplicate record delivery for exactly-once connectors, which is also awful. > > 5. Existing source connectors won't necessarily have to be configured with > "consumer.override" properties, but there are a couple of cases where that > will be necessary: > > a. Connector is running against against a secured Kafka cluster and the > principal configured in the worker via the various "consumer." properties > (if there is one) doesn't have permission to access the offsets topic that > the connector will use. If no "consumer.override." properties are specified > in this case, the connector and its tasks will fail. > > b. Connector is running against a separate Kafka cluster from the one > specified in the worker's config with the "bootstrap.servers" property (or > the "consumer.
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Randall, No complaints about email size from me. Let's dive in! 1. Sure! Especially important in my mind is that this is already possible with Connect as it is today, and users can benefit from this with or without the expanded exactly-once souce support we're trying to add with this KIP. I've added that info to the "Motivation" section and included a brief overview of the idempotent producer in the "Background and References" section. 2. I actually hadn't considered enabling exactly-once source support by default. Thinking over it now, I'm a little hesitant to do so just because, even with the best testing out there, it's a pretty large change and it seems safest to try to make it opt-in in case there's unanticipated fallout. Then again, when incremental cooperative rebalancing was introduced, it was made opt-out instead of opt-in. However, ICR came with lower known risk of breaking existing users' setups; we know for a fact that, if you haven't granted your worker or connector principals some ACLs on Kafka, your connectors will fail. In an ideal world people would carefully read upgrade notes and either grant those permissions or disable the feature before upgrading their Connect cluster to 3.0, but if they don't, they'll be in for a world of hurt. Overall I'd be more comfortable letting this feature incubate for a little bit to let everyone get familiar with it before possibly enabling it in 4.0 by default; what do you think? 3. I didn't think too long about the name for the offsets topic property; it seemed pretty unlikely that it'd conflict with existing connector property names. One alternative could be to follow the idiom established by KIP-458 and include the word "override" in there somewhere, but none of the resulting names seem very intuitive ("offsets.storage.override.topic" seems the best to me but even that isn't super clear). Happy to field suggestions if anyone has any alternatives they'd like to propose. 4. I _really_ wanted to enable per-connector toggling of this feature for the exact reasons you've outlined. There's already a couple cases where some piece of functionality was introduced that users could only control at the worker config level and, later, effort had to be made in order to add per-connector granularity: key/value converters and connector Kafka clients are the two at the top of my head, and there may be others. So if at all possible, it'd be great if we could support this. The only thing standing in the way is that it allows exactly-once delivery to be compromised, which in my estimation was unacceptable. I'm hoping we can make this feature great enough that it'll work with most if not all source connectors out there, and users won't even want to toggle it on a per-connector basis. Otherwise, we'll have to decide between forcing users to split their connectors across two Connect clusters (one with exactly-once source enabled, one with it disabled), which would be awful, or potentially seeing duplicate record delivery for exactly-once connectors, which is also awful. 5. Existing source connectors won't necessarily have to be configured with "consumer.override" properties, but there are a couple of cases where that will be necessary: a. Connector is running against against a secured Kafka cluster and the principal configured in the worker via the various "consumer." properties (if there is one) doesn't have permission to access the offsets topic that the connector will use. If no "consumer.override." properties are specified in this case, the connector and its tasks will fail. b. Connector is running against a separate Kafka cluster from the one specified in the worker's config with the "bootstrap.servers" property (or the "consumer.bootstrap.servers" property, if specified). For an example, the connector is configured with a "producer.override.boostrap.servers" property. If no corresponding "consumer.override.bootstrap.servers" property is specified by the connector (or on is provided but doesn't match the one used for the producer), bad things will happen--best case, the connector refuses to start up because it's configured with a separate offsets topic and no offsets migration is able to take place; worst case, the connector starts reading offsets from a stale topic and every time a task is restarted, duplicate records ensue. Since this can compromise delivery guarantees I think action is worth taking here, and I'm glad you've raised this scenario. Proposed approach: If the user hasn't specified a "consumer.override.bootstrap.servers" property in their connector config and the "bootstrap.servers" property that we'd configure its consumer with (based on either the "consumer.bootstrap.servers" worker property or, if not provided, the "bootstrap.servers" worker property) is different from the "bootstrap.servers" that we'd configure its producer with, we can log a warning but then automatically configure the consumer with the "bootstrap.servers" that are going to b
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Randall, Thanks for taking a look! Responses inline: > This would seem to make the transactions based upon time rather than record count. Is that really the intent? Did you consider using one transaction per "batch" of records, whatever number of records that means? One issue with the time-based boundary is that `offset.flush.interval.ms` defaults to 60 seconds, and that could mean many hundreds of thousands of records on a high throughput task. (BTW, I do agree with the KIP's rejected alternative to not expose to the task or let the task define the transaction boundaries.) Yep, this was the intent. I covered a few alternatives in the "Finer control over offset commits" section in the rejected alternatives all the way at the bottom, including allowing source tasks to define their own transaction boundaries, allowing per-connector source offset intervals, and allowing offsets to be committed after a certain number of records. While writing this up I also realized that, besides establishing batch boundaries based on the number of records, another possibility could also be to count each collection of records returned from a single call to SourceTask::poll as its own batch, although this might seriously hamper throughput for tasks that provide a small number of records at a time, but do so with high frequency. I was hoping we could save this for future work since it's unclear that it would be necessary to operate Connect with exactly-once source support. It almost seems like a bit of a leaky abstraction for users to have to worry about the details of offset commit logic, and I'm hoping we can do some magic behind the curtains to avoid this. Here's a few thoughts on how we can keep fixed-time-interval offset flushing with exactly-once source support without making things worse for users: 1. Ignore the offset flush timeout parameter for exactly-once source tasks. They should be allowed to take as long as they want to complete an offset commit, since the cost of failure at that point is to fail the source task. This might require a change to how Connect manages source task offset commits right now since a single worker-global thread is used to schedule and perform offset commits for all source tasks, and any blocked task will also block offset commits for all other tasks, but I'm confident that we can find a way to solve this problem and would like to leave it as an implementation detail that doesn't need to be covered extensively in this KIP. 2. The only other problem I foresee with high-throughput tasks like the ones you described is that it may take longer than the transaction timeout for a task to flush all of its records to Kafka. In this case, there are two actions that users can take to nurse their connector back to health: reconfigure it to produce records with a lower throughput, or increase the transaction timeout for the producers used by the connector. We can include include these steps in the error message for a task that fails due to producer transaction timeout. Does this seem reasonable? If so, I'll add this info to the design doc. > Gwen's #5 question was about rebalances of a source connector. Your answer made sense, but while the KIP talks about source tasks being stopped before the connector is rebalanced, I'm wondering about the delay or other impact this has on the rebalance itself. The "Cannot fence out producers during rebalance" section talks about fencing out producers, but IIUC this is not the same as: > ... all workers will preemptively stop all tasks of all source connectors for which task configurations. > If this is already addressed in the KIP, please just point me to that section. Not addressed yet :) I haven't called out a potential impact on rebalance because, although there will probably be one, I don't believe it'll be substantial. After a connector is reconfigured and its new task configs are written to the config topic, the sequence of events for a single worker would be: 1. New task configs are read from the config topic 2. All tasks for that connector are stopped in parallel (which is already how en-masse task start/stop is performed by a distributed worker) 3. Any tasks that take longer than the graceful task shutdown timeout are abandoned and allowed to continue running 4. Worker rejoins the group The only new impact on rebalance might come from steps 2-3, but it's unlikely to be significant. Distributed workers currently use a pool of eight threads to handle en-masse start/stop of tasks and connectors, have a default graceful task shutdown timeout of five seconds, and a default rebalance timeout of sixty seconds. In order to block the worker for an extra thirty seconds (only half of the rebalance timeout), there would have to be (30 / 5) * 8 = 48 tasks that are either completely hung or drag their feet for five seconds each during shutdown. This would have to either come from a single connector, or a collection of connectors that were reconfigured in ex
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hey, Chris. Thanks again for all your hard work figuring out this complicated problem. While there are still issues to iron out, at the moment it seems pretty sensible. Here's my follow up email with additional comments/questions. I've organized these by section to help provide context, and I've numbered the questions to make it easier to reply/discuss. I apologize for the length of this email -- I thought it better to do this all at once rather than string out in several emails. Motivation 1. This section does not mention another important source of duplicate records: producer retries. Using the idempotent producer can prevent duplicates due to retries, but it can't help with anything else. Perhaps it's worth mentioning? Public Interfaces 2. Have we considered `exactly.once.source.enabled=true` by default in the worker config? We're currently on track for considering this KIP for inclusion in AK 3.0, so we are allowed backward incompatible changes. 3. It is possible (but maybe unlikely) that existing source connectors might already have a connector config property that matches the proposed new connector config `offsets.storage.topic`. It's good that this matches the worker's config, but were any other property names considered? 4. I see that one of the rejected alternatives was a connector-level option to disable EOS. For example, what happens if some source connectors don't work well with EOS (e.g., the transaction blocking behavior of a multi-task source connector is unacceptable)? What would we say if a user wants to use EOS for some source connectors but not others? Is it really viable to not allow that? 5. The worker source task will create a consumer for the offset topic used by the connector. Currently, Connect does not use or do anything with `consumer.override.*` properties, but won't they now likely be needed with a source connector using EOS? What happens with existing source connector configurations that don't define these? Proposed Changes Offset reads 6. The KIP says "a message will be logged notifying them of this fact." Should it be more specific, namely "a warning message..."? SourceTask record commit API 7. The second paragraph talks about why we cannot guarantee the methods be invoked after every successful commit, and I think that makes sense because there is no other practical way to handle worker failures. It does seem like it would be more useful to describe the expectation about calling these methods during normal task shutdown. Specifically, should connector developers expect that these methods will be called following a successful commit immediately prior to stopping the task (barring worker failures)? Per-connector offsets topic This section says: > Finally, it allows users to limit the effect that hanging transactions on > an offsets topic will have. If tasks A and B use the same offsets topic, > and task A initiates a transaction on that offsets topic right before task > B starts up, then task A dies suddenly without committing its transaction, > task B will have to wait for that transaction to time out before it can > read to the end of the offsets topic. If the transaction timeout is set > very high for task A (to accommodate bursts of high throughput, for > example), this will block task B from processing any data for a long time. > Although this scenario may be unavoidable in some cases, using a dedicated > offsets topic for each connector should allow cluster administrators to > isolate the blast radius of a hanging transaction on an offsets topic. This > way, although tasks of the same connector may still interfere with each > other, they will at least not interfere with tasks of other connectors. > This should be sufficient for most multitenant environments. 8. Won't there also be transaction contention just within a single connector using multiple tasks? Even if that connector has all offsets go to a dedicated topic (separate from all other connectors), then isn't it still possible/likely that a transaction from task 1 blocks a transaction from task 2? How will this affect latency perceived by consumers (e.g., the records from task 2's tranasction don't become visible until the transactions from task 1 and task 2 commit)? Migration 9. This section says: > the worker will read to the end of the offsets topic before starting the > task. If there are no offsets for the connector present in the topic and > there is no sentinel value... This sounds like the worker will know that a sentinel value is expected, but IIUC a worker cannot know that. Instead, the worker consuming the topic must read to the end and expect _either_ a sentinel value _or_ committed offsets. Can the text make this more clear? Task count records 10. This section says "then workers will not bring up tasks for the connector." What does this mean? Does it mean the tasks fail? How do users recover from this? I think the "Addressed failure/degradation scenarios" section tries to address some of
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Chris, Thanks for the well-written KIP and the more recent updates. It's exciting to envision EOS for Connect source connectors! I'll follow up this email with another that goes into specific comments and questions not already touched on by others. This email addresses comments and questions previously discussed on this thread. I'd like to +1 your response to Gwen's question about connector-specific offsets topic (#2 in her email). But importantly, a connector can override any producer settings, including Kafka credentials that don't have permissions on the internal Connect offsets topic. For this reason, I think it's necessary for this KIP to address those scenarios. Gwen's #4 question was about how transactions related to the batches source connector tasks return from their `poll()` method, and you followed up by clarifying the document to make this more clear. IIUC, the KIP suggests still using the existing `offset.flush.interval.ms` property and then committing transactions based (solely?) upon the value of the property. This would seem to make the transactions based upon time rather than record count. Is that really the intent? Did you consider using one transaction per "batch" of records, whatever number of records that means? One issue with the time-based boundary is that `offset.flush.interval.ms` defaults to 60 seconds, and that could mean many hundreds of thousands of records on a high throughput task. (BTW, I do agree with the KIP's rejected alternative to not expose to the task or let the task define the transaction boundaries.) Gwen's #5 question was about rebalances of a source connector. Your answer made sense, but while the KIP talks about source tasks being stopped before the connector is rebalanced, I'm wondering about the delay or other impact this has on the rebalance itself. The "Cannot fence out producers during rebalance" section talks about fencing out producers, but IIUC this is not the same as: ... all workers will preemptively stop all tasks of all source connectors for which task configurations. If this is already addressed in the KIP, please just point me to that section. Finally, regarding Guozhang's question about "partial rebalances" rather than fencing out all producers. In your initial response, you said the following: Connectors can arbitrarily reassign source partitions (such as database tables, or Kafka topic partitions) across tasks, so even if the assignment of tasks across workers remains unchanged, the assignment of source partitions across those workers might. This is a very important point. In fact, there are definitely connectors that shuffle around "source partition" assignments to different tasks, and IMO we have to take that into consideration because Connect provides no guidance on this and because there are good reasons to do this. So I'm glad the KIP takes this into account. This is actually one of the issues I ran into during an earlier attempt to POC EOS for source connectors, and the ability to force fencing is a novel (albeit heavy-handed) way to deal with that. As I said, I'll follow up in subsequent emails on other specific questions I have. Best regards, Randall On Mon, Feb 22, 2021 at 6:45 PM Guozhang Wang wrote: > This is a great explanation, thank you! > > On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton wrote: > > > Hi Guozhang, > > > > Your understanding should be correct in most cases, but there are two > finer > > points that may interest you: > > > > 1. It's technically dependent on the implementation of the connector and > > how it chooses to allocate source partitions across its tasks; even if > the > > number of tasks and source partitions remains completely unchanged, a > > connector may still choose to shuffle around the partition->task > > allocation. I can't think of any cases where this might happen off the > top > > of my head, but it seemed worth sharing given the educational nature of > the > > question. > > 2. It's also possible that the number of source partitions remains > > unchanged, but the set of source partitions changes. One case where this > > might happen is with a database connector monitoring for tables that > match > > a given regex every five minutes; if a table that matched that regex > during > > the last scan got assigned to a task and then dropped, and then another > > table that matched the regex got added before the next scan, the > connector > > would see the same number of tables, but the actual set would be > different. > > At this point, it would again be connector-dependent for whether the > > already-assigned tables stayed assigned to the same tasks. Is anyone else > > reminded of the various consumer partition assignment strategies at this > > point? > > > > A general comment I should make here (not necessarily for your benefit > but > > for anyone following along) is that it's important to keep in mind that > > "source partitions" in Kafka Connect aren't Kafka topic partitions (well, > >
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
This is a great explanation, thank you! On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton wrote: > Hi Guozhang, > > Your understanding should be correct in most cases, but there are two finer > points that may interest you: > > 1. It's technically dependent on the implementation of the connector and > how it chooses to allocate source partitions across its tasks; even if the > number of tasks and source partitions remains completely unchanged, a > connector may still choose to shuffle around the partition->task > allocation. I can't think of any cases where this might happen off the top > of my head, but it seemed worth sharing given the educational nature of the > question. > 2. It's also possible that the number of source partitions remains > unchanged, but the set of source partitions changes. One case where this > might happen is with a database connector monitoring for tables that match > a given regex every five minutes; if a table that matched that regex during > the last scan got assigned to a task and then dropped, and then another > table that matched the regex got added before the next scan, the connector > would see the same number of tables, but the actual set would be different. > At this point, it would again be connector-dependent for whether the > already-assigned tables stayed assigned to the same tasks. Is anyone else > reminded of the various consumer partition assignment strategies at this > point? > > A general comment I should make here (not necessarily for your benefit but > for anyone following along) is that it's important to keep in mind that > "source partitions" in Kafka Connect aren't Kafka topic partitions (well, > unless your connector is designed to replicate data across Kafka clusters > like MirrorMaker 2). As a result, we have to rely on developer-written code > to a greater extent to define what the source partitions for a source > connector are, and how to divvy up that work amongst tasks. > > Hope this helps! If you have any further questions please hit me with them; > I doubt you'll be the only one wondering about these things. > > Cheers, > > Chris > > On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang wrote: > > > Thanks Chris, yeah I think I agree with you that this does not > necessarily > > have to be in the scope of this KIP. > > > > My understanding was that the source partitions -> tasks are not static > but > > dynamic, but they are only changed when either the number of partitions > > changed or "tasks.max" config changed (please correct me if I'm wrong), > so > > what I'm thinking that we can try to detect if either of these things > > happens, and if they do not happen we can assume the mapping from > > partitions -> tasks does not change --- of course this requires some > > extension on the API, aligned with what you said. I would like to make > sure > > that my understanding here is correct :) > > > > Guozhang > > > > > > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton > > wrote: > > > > > Hi Guozhang, > > > > > > Thanks for taking a look, and for your suggestion! > > > > > > I think there is room for more intelligent fencing strategies, but I > > think > > > that it'd have to be more nuanced than one based on task->worker > > > assignments. Connectors can arbitrarily reassign source partitions > (such > > as > > > database tables, or Kafka topic partitions) across tasks, so even if > the > > > assignment of tasks across workers remains unchanged, the assignment of > > > source partitions across those workers might. Connect doesn't do any > > > inspection of task configurations at the moment, and without expansions > > to > > > the Connector/Task API, it'd likely be impossible to get information > from > > > tasks about their source partition assignments. With that in mind, I > > think > > > we may want to leave the door open for more intelligent task fencing > but > > > not include that level of optimization at this stage. Does that sound > > fair > > > to you? > > > > > > There is one case that I've identified where we can cheaply optimize > > right > > > now: single-task connectors, such as the Debezium CDC source > connectors. > > If > > > a connector is configured at some point with a single task, then some > > other > > > part of its configuration is altered but the single-task aspect > remains, > > > the leader doesn't have to worry about fencing out the older task as > the > > > new task's producer will do that automatically. In this case, the > leader > > > can skip the producer fencing round and just write the new task count > > > record straight to the config topic. I've added this case to the KIP; > if > > it > > > overcomplicates things I'm happy to remove it, but seeing as it serves > a > > > real use case and comes fairly cheap, I figured it'd be best to include > > > now. > > > > > > Thanks again for your feedback; if you have other thoughts I'd love to > > hear > > > them! > > > > > > Cheers, > > > > > > Chris > > > > > > On Mon, Feb 22, 2021
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Guozhang, Your understanding should be correct in most cases, but there are two finer points that may interest you: 1. It's technically dependent on the implementation of the connector and how it chooses to allocate source partitions across its tasks; even if the number of tasks and source partitions remains completely unchanged, a connector may still choose to shuffle around the partition->task allocation. I can't think of any cases where this might happen off the top of my head, but it seemed worth sharing given the educational nature of the question. 2. It's also possible that the number of source partitions remains unchanged, but the set of source partitions changes. One case where this might happen is with a database connector monitoring for tables that match a given regex every five minutes; if a table that matched that regex during the last scan got assigned to a task and then dropped, and then another table that matched the regex got added before the next scan, the connector would see the same number of tables, but the actual set would be different. At this point, it would again be connector-dependent for whether the already-assigned tables stayed assigned to the same tasks. Is anyone else reminded of the various consumer partition assignment strategies at this point? A general comment I should make here (not necessarily for your benefit but for anyone following along) is that it's important to keep in mind that "source partitions" in Kafka Connect aren't Kafka topic partitions (well, unless your connector is designed to replicate data across Kafka clusters like MirrorMaker 2). As a result, we have to rely on developer-written code to a greater extent to define what the source partitions for a source connector are, and how to divvy up that work amongst tasks. Hope this helps! If you have any further questions please hit me with them; I doubt you'll be the only one wondering about these things. Cheers, Chris On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang wrote: > Thanks Chris, yeah I think I agree with you that this does not necessarily > have to be in the scope of this KIP. > > My understanding was that the source partitions -> tasks are not static but > dynamic, but they are only changed when either the number of partitions > changed or "tasks.max" config changed (please correct me if I'm wrong), so > what I'm thinking that we can try to detect if either of these things > happens, and if they do not happen we can assume the mapping from > partitions -> tasks does not change --- of course this requires some > extension on the API, aligned with what you said. I would like to make sure > that my understanding here is correct :) > > Guozhang > > > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton > wrote: > > > Hi Guozhang, > > > > Thanks for taking a look, and for your suggestion! > > > > I think there is room for more intelligent fencing strategies, but I > think > > that it'd have to be more nuanced than one based on task->worker > > assignments. Connectors can arbitrarily reassign source partitions (such > as > > database tables, or Kafka topic partitions) across tasks, so even if the > > assignment of tasks across workers remains unchanged, the assignment of > > source partitions across those workers might. Connect doesn't do any > > inspection of task configurations at the moment, and without expansions > to > > the Connector/Task API, it'd likely be impossible to get information from > > tasks about their source partition assignments. With that in mind, I > think > > we may want to leave the door open for more intelligent task fencing but > > not include that level of optimization at this stage. Does that sound > fair > > to you? > > > > There is one case that I've identified where we can cheaply optimize > right > > now: single-task connectors, such as the Debezium CDC source connectors. > If > > a connector is configured at some point with a single task, then some > other > > part of its configuration is altered but the single-task aspect remains, > > the leader doesn't have to worry about fencing out the older task as the > > new task's producer will do that automatically. In this case, the leader > > can skip the producer fencing round and just write the new task count > > record straight to the config topic. I've added this case to the KIP; if > it > > overcomplicates things I'm happy to remove it, but seeing as it serves a > > real use case and comes fairly cheap, I figured it'd be best to include > > now. > > > > Thanks again for your feedback; if you have other thoughts I'd love to > hear > > them! > > > > Cheers, > > > > Chris > > > > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton > wrote: > > > > > Hi Gwen, > > > > > > Thanks for the feedback! > > > > > > 0. > > > That's a great point; I've updated the motivation section with that > > > rationale. > > > > > > 1. > > > This enables safe "hard downgrades" of clusters where, instead of just > > > disabling exactly-once support o
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Thanks Chris, yeah I think I agree with you that this does not necessarily have to be in the scope of this KIP. My understanding was that the source partitions -> tasks are not static but dynamic, but they are only changed when either the number of partitions changed or "tasks.max" config changed (please correct me if I'm wrong), so what I'm thinking that we can try to detect if either of these things happens, and if they do not happen we can assume the mapping from partitions -> tasks does not change --- of course this requires some extension on the API, aligned with what you said. I would like to make sure that my understanding here is correct :) Guozhang On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton wrote: > Hi Guozhang, > > Thanks for taking a look, and for your suggestion! > > I think there is room for more intelligent fencing strategies, but I think > that it'd have to be more nuanced than one based on task->worker > assignments. Connectors can arbitrarily reassign source partitions (such as > database tables, or Kafka topic partitions) across tasks, so even if the > assignment of tasks across workers remains unchanged, the assignment of > source partitions across those workers might. Connect doesn't do any > inspection of task configurations at the moment, and without expansions to > the Connector/Task API, it'd likely be impossible to get information from > tasks about their source partition assignments. With that in mind, I think > we may want to leave the door open for more intelligent task fencing but > not include that level of optimization at this stage. Does that sound fair > to you? > > There is one case that I've identified where we can cheaply optimize right > now: single-task connectors, such as the Debezium CDC source connectors. If > a connector is configured at some point with a single task, then some other > part of its configuration is altered but the single-task aspect remains, > the leader doesn't have to worry about fencing out the older task as the > new task's producer will do that automatically. In this case, the leader > can skip the producer fencing round and just write the new task count > record straight to the config topic. I've added this case to the KIP; if it > overcomplicates things I'm happy to remove it, but seeing as it serves a > real use case and comes fairly cheap, I figured it'd be best to include > now. > > Thanks again for your feedback; if you have other thoughts I'd love to hear > them! > > Cheers, > > Chris > > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton wrote: > > > Hi Gwen, > > > > Thanks for the feedback! > > > > 0. > > That's a great point; I've updated the motivation section with that > > rationale. > > > > 1. > > This enables safe "hard downgrades" of clusters where, instead of just > > disabling exactly-once support on each worker, each worker is rolled back > > to an earlier version of the Connect framework that doesn't support > > per-connector offsets topics altogether. Those workers would go back to > all > > using a global offsets topic, and any offsets stored in per-connector > > topics would be lost to those workers. This would cause a large number of > > duplicates to flood the downstream system. While technically permissible > > given that the user in this case will have knowingly switched to a > version > > of the Connect framework that doesn't support exactly-once source > > connectors (and is therefore susceptible to duplicate delivery of > records), > > the user experience in this case could be pretty bad. A similar situation > > is if users switch back from per-connector offsets topics to the global > > offsets topic. > > I've tried to make this more clear in the KIP by linking to the "Hard > > downgrade" section from the proposed design, and by expanding on the > > rationale provided for redundant global offset writes in the "Hard > > downgrade" section. Let me know if you think this could be improved or > > think a different approach is warranted. > > > > 2. > > I think the biggest difference between Connect and Streams comes from the > > fact that Connect allows users to create connectors that target different > > Kafka clusters on the same worker. This hasn't been a problem in the past > > because workers use two separate producers to write offset data and > source > > connector records, but in order to write offsets and records in the same > > transaction, it becomes necessary to use a single producer, which also > > requires that the internal offsets topic be hosted on the same Kafka > > cluster that the connector is targeting. > > This may sound like a niche use case but it was actually one of the > > driving factors behind KIP-458 ( > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy > ), > > and it's a feature that we rely on heavily today. > > If there's too much going on in this KIP and we'd prefer to drop support > > for running that type of setup with exactly-
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Guozhang, Thanks for taking a look, and for your suggestion! I think there is room for more intelligent fencing strategies, but I think that it'd have to be more nuanced than one based on task->worker assignments. Connectors can arbitrarily reassign source partitions (such as database tables, or Kafka topic partitions) across tasks, so even if the assignment of tasks across workers remains unchanged, the assignment of source partitions across those workers might. Connect doesn't do any inspection of task configurations at the moment, and without expansions to the Connector/Task API, it'd likely be impossible to get information from tasks about their source partition assignments. With that in mind, I think we may want to leave the door open for more intelligent task fencing but not include that level of optimization at this stage. Does that sound fair to you? There is one case that I've identified where we can cheaply optimize right now: single-task connectors, such as the Debezium CDC source connectors. If a connector is configured at some point with a single task, then some other part of its configuration is altered but the single-task aspect remains, the leader doesn't have to worry about fencing out the older task as the new task's producer will do that automatically. In this case, the leader can skip the producer fencing round and just write the new task count record straight to the config topic. I've added this case to the KIP; if it overcomplicates things I'm happy to remove it, but seeing as it serves a real use case and comes fairly cheap, I figured it'd be best to include now. Thanks again for your feedback; if you have other thoughts I'd love to hear them! Cheers, Chris On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton wrote: > Hi Gwen, > > Thanks for the feedback! > > 0. > That's a great point; I've updated the motivation section with that > rationale. > > 1. > This enables safe "hard downgrades" of clusters where, instead of just > disabling exactly-once support on each worker, each worker is rolled back > to an earlier version of the Connect framework that doesn't support > per-connector offsets topics altogether. Those workers would go back to all > using a global offsets topic, and any offsets stored in per-connector > topics would be lost to those workers. This would cause a large number of > duplicates to flood the downstream system. While technically permissible > given that the user in this case will have knowingly switched to a version > of the Connect framework that doesn't support exactly-once source > connectors (and is therefore susceptible to duplicate delivery of records), > the user experience in this case could be pretty bad. A similar situation > is if users switch back from per-connector offsets topics to the global > offsets topic. > I've tried to make this more clear in the KIP by linking to the "Hard > downgrade" section from the proposed design, and by expanding on the > rationale provided for redundant global offset writes in the "Hard > downgrade" section. Let me know if you think this could be improved or > think a different approach is warranted. > > 2. > I think the biggest difference between Connect and Streams comes from the > fact that Connect allows users to create connectors that target different > Kafka clusters on the same worker. This hasn't been a problem in the past > because workers use two separate producers to write offset data and source > connector records, but in order to write offsets and records in the same > transaction, it becomes necessary to use a single producer, which also > requires that the internal offsets topic be hosted on the same Kafka > cluster that the connector is targeting. > This may sound like a niche use case but it was actually one of the > driving factors behind KIP-458 ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy), > and it's a feature that we rely on heavily today. > If there's too much going on in this KIP and we'd prefer to drop support > for running that type of setup with exactly-once source connectors for now, > I can propose this in a separate KIP. I figured it'd be best to get this > out of the way with the initial introduction of exactly-once source support > in order to make adoption by existing Connect users as seamless as > possible, and since we'd likely have to address this issue before being > able to utilize the feature ourselves. > I switched around the ordering of the "motivation" section for > per-connector offsets topics to put the biggest factor first, and called it > out as the major difference between Connect and Streams in this case. > > 3. > Fair enough, after giving it a little more thought I agree that allowing > users to shoot themselves in the foot is a bad idea here. There's also some > precedent for handling this with the "enable.idempotence" and " > transactional.id" producer properties; if you specify a transactional ID > but do
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Gwen, Thanks for the feedback! 0. That's a great point; I've updated the motivation section with that rationale. 1. This enables safe "hard downgrades" of clusters where, instead of just disabling exactly-once support on each worker, each worker is rolled back to an earlier version of the Connect framework that doesn't support per-connector offsets topics altogether. Those workers would go back to all using a global offsets topic, and any offsets stored in per-connector topics would be lost to those workers. This would cause a large number of duplicates to flood the downstream system. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be pretty bad. A similar situation is if users switch back from per-connector offsets topics to the global offsets topic. I've tried to make this more clear in the KIP by linking to the "Hard downgrade" section from the proposed design, and by expanding on the rationale provided for redundant global offset writes in the "Hard downgrade" section. Let me know if you think this could be improved or think a different approach is warranted. 2. I think the biggest difference between Connect and Streams comes from the fact that Connect allows users to create connectors that target different Kafka clusters on the same worker. This hasn't been a problem in the past because workers use two separate producers to write offset data and source connector records, but in order to write offsets and records in the same transaction, it becomes necessary to use a single producer, which also requires that the internal offsets topic be hosted on the same Kafka cluster that the connector is targeting. This may sound like a niche use case but it was actually one of the driving factors behind KIP-458 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy), and it's a feature that we rely on heavily today. If there's too much going on in this KIP and we'd prefer to drop support for running that type of setup with exactly-once source connectors for now, I can propose this in a separate KIP. I figured it'd be best to get this out of the way with the initial introduction of exactly-once source support in order to make adoption by existing Connect users as seamless as possible, and since we'd likely have to address this issue before being able to utilize the feature ourselves. I switched around the ordering of the "motivation" section for per-connector offsets topics to put the biggest factor first, and called it out as the major difference between Connect and Streams in this case. 3. Fair enough, after giving it a little more thought I agree that allowing users to shoot themselves in the foot is a bad idea here. There's also some precedent for handling this with the "enable.idempotence" and " transactional.id" producer properties; if you specify a transactional ID but don't specify a value for idempotence, the producer just does the right thing for you by enabling idempotence and emitting a log message letting you know that it's done so. I've adjusted the proposed behavior to try to use a similar approach; let me know what you think. There is the potential gap here where, sometime in the future, a third accepted value for the "isolation.level" property is added to the consumer API and users will be unable to use that new value for their worker. But the likelihood of footgunning seems much greater than this scenario, and we can always address expansions to the consumer API with changes to the Connect framework as well if/when that becomes necessary. I've also added a similar note to the source task's transactional ID property; user overrides of it will also be disabled. 4. Yeah, that's mostly correct. I tried to touch on this in the "Motivation" section with this bit: > The Connect framework periodically writes source task offsets to an internal Kafka topic at a configurable interval, once the source records that they correspond to have been successfully sent to Kafka. I've expanded on this in the "Offset (and record) writes" section, and I've tweaked the "Motivation" section a little bit to add a link to the relevant config property and to make the language a little more accurate. 5. This isn't quite as bad as stop-the-world; more like stop-the-connector. If a worker is running a dozen connectors and one of those (that happens to be a source) is reconfigured, only the tasks for that connector will be preemptively halted, and all other tasks and connectors will continue running. This is pretty close to the current behavior with incremental rebalancing; the only difference is that, instead of waiting for the rebalance to complete before halting the tasks for that connector, the worker will halt them in preparation
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hello Chris, Thanks for the great write-up! I was mainly reviewing the admin fenceProducers API of the KIP. I think it makes sense overall. I'm just wondering if we can go one step further, that instead of forcing to fence out all producers of the previous generation, could we try to achieve "partial rebalance" still by first generate the new assignment, and then based on the new assignment only fence out producers involved in tasks that are indeed migrated? Just a wild thought to bring up for debate. Guozhang On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira wrote: > Hey Chris, > > Thank you for the proposal. Few questions / comments: > > 0. It may be worth pointing out in the motivation section that > source-connector exactly once is more important than sink connector > exactly once, since many target systems will have unique key > constraint mechanisms that will prevent duplicates. Kafka does not > have any such constraints, so without this KIP-618, exactly once won't > be possible. > 1. I am not clear why we need the worker to async copy offsets from > the connector-specific offset topic to a global offsets topic > 2. While the reasoning you have for offset topic per connector appears > sound, it doesn't add up with the use of transactions in KafkaStreams. > My understanding is that KafkaStreams uses shared offsets topic with > all the other consumers, and (apparently) corrupting data and delays > by other tenants is a non-issue. Perhaps you can comment on how > Connect is different? In general much of the complexity in the KIP is > related to the separate offset topic, and I'm wondering if this can be > avoided. The migration use-case is interesting, but not related to > exactly-once and can be handled separately. > 3. Allowing users to override the isolation level for the offset > reader, even when exactly-once is enabled, thereby disabling > exactly-once in a non-obvious way. I get that connect usually allows > users to shoot themselves in the foot, but are there any actual > benefits for allowing it in this case? Maybe it is better if we don't? > I don't find the argument that we always did this to be particularly > compelling. > 4. It isn't stated explicitly, but it sounds like connect or source > connectors already have some batching mechanism, and that transaction > boundaries will match the batches (i.e. each batch will be a > transaction?). If so, worth being explicit. > 5. "When a rebalance is triggered, before (re-)joining the cluster > group, all workers will preemptively stop all tasks of all source > connectors for which task configurations are present in the config > topic after the latest task count record" - how will this play with > the incremental rebalances? isn't this exactly the stop-the-world > rebalance we want to avoid? > 6. "the worker will instantiate a transactional producer whose > transactional ID is, by default, the group ID of the cluster (but may > be overwritten by users using the transactional.id worker property)" - > If users change transactional.id property, zombie leaders won't get > fenced (since they will have an older and different transactional id) > > Thanks, > > Gwen > > On Thu, May 21, 2020 at 11:21 PM Chris Egerton > wrote: > > > > Hi all, > > > > I know it's a busy time with the upcoming 2.6 release and I don't expect > > this to get a lot of traction until that's done, but I've published a KIP > > for allowing atomic commit of offsets and records for source connectors > and > > would appreciate your feedback: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > > > This feature should make it possible to implement source connectors with > > exactly-once delivery guarantees, and even allow a wide range of existing > > source connectors to provide exactly-once delivery guarantees with no > > changes required. > > > > Cheers, > > > > Chris > > > > -- > Gwen Shapira > Engineering Manager | Confluent > 650.450.2760 | @gwenshap > Follow us: Twitter | blog > -- -- Guozhang
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hey Chris, Thank you for the proposal. Few questions / comments: 0. It may be worth pointing out in the motivation section that source-connector exactly once is more important than sink connector exactly once, since many target systems will have unique key constraint mechanisms that will prevent duplicates. Kafka does not have any such constraints, so without this KIP-618, exactly once won't be possible. 1. I am not clear why we need the worker to async copy offsets from the connector-specific offset topic to a global offsets topic 2. While the reasoning you have for offset topic per connector appears sound, it doesn't add up with the use of transactions in KafkaStreams. My understanding is that KafkaStreams uses shared offsets topic with all the other consumers, and (apparently) corrupting data and delays by other tenants is a non-issue. Perhaps you can comment on how Connect is different? In general much of the complexity in the KIP is related to the separate offset topic, and I'm wondering if this can be avoided. The migration use-case is interesting, but not related to exactly-once and can be handled separately. 3. Allowing users to override the isolation level for the offset reader, even when exactly-once is enabled, thereby disabling exactly-once in a non-obvious way. I get that connect usually allows users to shoot themselves in the foot, but are there any actual benefits for allowing it in this case? Maybe it is better if we don't? I don't find the argument that we always did this to be particularly compelling. 4. It isn't stated explicitly, but it sounds like connect or source connectors already have some batching mechanism, and that transaction boundaries will match the batches (i.e. each batch will be a transaction?). If so, worth being explicit. 5. "When a rebalance is triggered, before (re-)joining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record" - how will this play with the incremental rebalances? isn't this exactly the stop-the-world rebalance we want to avoid? 6. "the worker will instantiate a transactional producer whose transactional ID is, by default, the group ID of the cluster (but may be overwritten by users using the transactional.id worker property)" - If users change transactional.id property, zombie leaders won't get fenced (since they will have an older and different transactional id) Thanks, Gwen On Thu, May 21, 2020 at 11:21 PM Chris Egerton wrote: > > Hi all, > > I know it's a busy time with the upcoming 2.6 release and I don't expect > this to get a lot of traction until that's done, but I've published a KIP > for allowing atomic commit of offsets and records for source connectors and > would appreciate your feedback: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > This feature should make it possible to implement source connectors with > exactly-once delivery guarantees, and even allow a wide range of existing > source connectors to provide exactly-once delivery guarantees with no > changes required. > > Cheers, > > Chris -- Gwen Shapira Engineering Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter | blog
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Jason, Fine by me; I wanted to be conservative with the return type but the case you've outlined sounds enticing enough that adding a little flexibility to the API seems warranted. I've added your suggestion to the proposed admin API expansions; let me know what you think. Cheers, Chris On Mon, Feb 1, 2021 at 3:38 PM Jason Gustafson wrote: > Hi Chris, > > If we add the new `fenceProducers` admin API, can we return the information > from the `InitProducerId` response (i.e. producer id and epoch)? We may not > have a use case for it yet, but I don't see any harm exposing it for the > future. For example, we could allow this state to be provided to the > Producer instance on initialization, which would save the need for the > second `InitProducerId` request in the current proposal. Also, the `Void` > type does give us much room for extension. > > -Jason > > > On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton wrote: > > > Hi Ning, > > > > Apologies for the delay in response. I realized after publishing the KIP > > that there were some finer points I hadn't considered in my design and > that > > it was far from providing exactly-once guarantees. In response to your > > questions: > > > > 1) The goal of the KIP is to ensure the accuracy of the offsets that the > > framework provides to source tasks; if tasks choose to manage offsets > > outside of the framework, they're on their own. So, the source records > and > > their offsets will be written/committed to Kafka, and the task will be > > provided them on startup, but it (or really, its predecessor) may not > have > > had time to do cleanup on resources associated with those records before > > being killed. > > > > 2) I've cleaned up this section and removed the pseudocode as it seems > too > > low-level to be worth discussing in a KIP. I'll try to summarize here, > > though: task.commit() is not what causes offsets provided to the > framework > > by tasks to be committed; it's simply a follow-up hook provided out of > > convenience to tasks so that they can clean up resources associated with > > the most recent batch of records (by ack'ing JMS messages, for example). > > The Connect framework uses an internal Kafka topic to store source task > > offsets. > > > > 3) In order to benefit from the improvements proposed in this KIP, yes, > the > > single source-of-truth should be the OffsetStorageReader provided to the > > task by the Connect framework, at least at startup. After startup, tasks > > should ideally bookkeep their own offset progress as each request to read > > offsets requires a read to the end of the offsets topic, which can be > > expensive in some cases. > > > > I've since expanded the KIP to include general exactly-once support for > > source connectors that should cover the points I neglected in my initial > > design, so it should be ready for review again. > > > > Cheers, > > > > Chris > > > > On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang > > wrote: > > > > > Hello Chris, > > > > > > That is an interesting KIP. I have a couple of questions: > > > > > > (1) in section of pseudo-code, what if the failure happens between 4(b) > > > and 5(a), meaning after the producer commit the transaction, and before > > > task.commitRecord(). > > > > > > (2) in section "source task life time", what is the difference between > > > "commit offset" and "offsets to commit"? Given that the offset storage > > can > > > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could > only > > > produce to a kafka topic, are / is the topic(s) the same ? (the topic > > that > > > producer writes offsets to and the topic task.commit() to) > > > > > > (3) for JDBC source task, it relies on `context.offsetStorageReader()` > ( > > > > > > https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140 > > ) > > > to retrieve the previously committed offset (if from a fresh start or > > > resume from failure). so it seems that the single-source-of-truth of > > where > > > to consume from last known / committed position stored in offset > storage > > > (e.g. kafka topic) managed by the periodic task.commit()? > > > > > > On 2020/05/22 06:20:51, Chris Egerton wrote: > > > > Hi all, > > > > > > > > I know it's a busy time with the upcoming 2.6 release and I don't > > expect > > > > this to get a lot of traction until that's done, but I've published a > > KIP > > > > for allowing atomic commit of offsets and records for source > connectors > > > and > > > > would appreciate your feedback: > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > > > > > > > This feature should make it possible to implement source connectors > > with > > > > exactly-once delivery guarantees, and even allow a wide range of > > existing > > > > source connectors to provide exactly-once delivery guarantees with no > > > > changes required
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Chris, If we add the new `fenceProducers` admin API, can we return the information from the `InitProducerId` response (i.e. producer id and epoch)? We may not have a use case for it yet, but I don't see any harm exposing it for the future. For example, we could allow this state to be provided to the Producer instance on initialization, which would save the need for the second `InitProducerId` request in the current proposal. Also, the `Void` type does give us much room for extension. -Jason On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton wrote: > Hi Ning, > > Apologies for the delay in response. I realized after publishing the KIP > that there were some finer points I hadn't considered in my design and that > it was far from providing exactly-once guarantees. In response to your > questions: > > 1) The goal of the KIP is to ensure the accuracy of the offsets that the > framework provides to source tasks; if tasks choose to manage offsets > outside of the framework, they're on their own. So, the source records and > their offsets will be written/committed to Kafka, and the task will be > provided them on startup, but it (or really, its predecessor) may not have > had time to do cleanup on resources associated with those records before > being killed. > > 2) I've cleaned up this section and removed the pseudocode as it seems too > low-level to be worth discussing in a KIP. I'll try to summarize here, > though: task.commit() is not what causes offsets provided to the framework > by tasks to be committed; it's simply a follow-up hook provided out of > convenience to tasks so that they can clean up resources associated with > the most recent batch of records (by ack'ing JMS messages, for example). > The Connect framework uses an internal Kafka topic to store source task > offsets. > > 3) In order to benefit from the improvements proposed in this KIP, yes, the > single source-of-truth should be the OffsetStorageReader provided to the > task by the Connect framework, at least at startup. After startup, tasks > should ideally bookkeep their own offset progress as each request to read > offsets requires a read to the end of the offsets topic, which can be > expensive in some cases. > > I've since expanded the KIP to include general exactly-once support for > source connectors that should cover the points I neglected in my initial > design, so it should be ready for review again. > > Cheers, > > Chris > > On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang > wrote: > > > Hello Chris, > > > > That is an interesting KIP. I have a couple of questions: > > > > (1) in section of pseudo-code, what if the failure happens between 4(b) > > and 5(a), meaning after the producer commit the transaction, and before > > task.commitRecord(). > > > > (2) in section "source task life time", what is the difference between > > "commit offset" and "offsets to commit"? Given that the offset storage > can > > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could only > > produce to a kafka topic, are / is the topic(s) the same ? (the topic > that > > producer writes offsets to and the topic task.commit() to) > > > > (3) for JDBC source task, it relies on `context.offsetStorageReader()` ( > > > https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140 > ) > > to retrieve the previously committed offset (if from a fresh start or > > resume from failure). so it seems that the single-source-of-truth of > where > > to consume from last known / committed position stored in offset storage > > (e.g. kafka topic) managed by the periodic task.commit()? > > > > On 2020/05/22 06:20:51, Chris Egerton wrote: > > > Hi all, > > > > > > I know it's a busy time with the upcoming 2.6 release and I don't > expect > > > this to get a lot of traction until that's done, but I've published a > KIP > > > for allowing atomic commit of offsets and records for source connectors > > and > > > would appreciate your feedback: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > > > > > This feature should make it possible to implement source connectors > with > > > exactly-once delivery guarantees, and even allow a wide range of > existing > > > source connectors to provide exactly-once delivery guarantees with no > > > changes required. > > > > > > Cheers, > > > > > > Chris > > > > > >
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi Ning, Apologies for the delay in response. I realized after publishing the KIP that there were some finer points I hadn't considered in my design and that it was far from providing exactly-once guarantees. In response to your questions: 1) The goal of the KIP is to ensure the accuracy of the offsets that the framework provides to source tasks; if tasks choose to manage offsets outside of the framework, they're on their own. So, the source records and their offsets will be written/committed to Kafka, and the task will be provided them on startup, but it (or really, its predecessor) may not have had time to do cleanup on resources associated with those records before being killed. 2) I've cleaned up this section and removed the pseudocode as it seems too low-level to be worth discussing in a KIP. I'll try to summarize here, though: task.commit() is not what causes offsets provided to the framework by tasks to be committed; it's simply a follow-up hook provided out of convenience to tasks so that they can clean up resources associated with the most recent batch of records (by ack'ing JMS messages, for example). The Connect framework uses an internal Kafka topic to store source task offsets. 3) In order to benefit from the improvements proposed in this KIP, yes, the single source-of-truth should be the OffsetStorageReader provided to the task by the Connect framework, at least at startup. After startup, tasks should ideally bookkeep their own offset progress as each request to read offsets requires a read to the end of the offsets topic, which can be expensive in some cases. I've since expanded the KIP to include general exactly-once support for source connectors that should cover the points I neglected in my initial design, so it should be ready for review again. Cheers, Chris On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang wrote: > Hello Chris, > > That is an interesting KIP. I have a couple of questions: > > (1) in section of pseudo-code, what if the failure happens between 4(b) > and 5(a), meaning after the producer commit the transaction, and before > task.commitRecord(). > > (2) in section "source task life time", what is the difference between > "commit offset" and "offsets to commit"? Given that the offset storage can > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could only > produce to a kafka topic, are / is the topic(s) the same ? (the topic that > producer writes offsets to and the topic task.commit() to) > > (3) for JDBC source task, it relies on `context.offsetStorageReader()` ( > https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140) > to retrieve the previously committed offset (if from a fresh start or > resume from failure). so it seems that the single-source-of-truth of where > to consume from last known / committed position stored in offset storage > (e.g. kafka topic) managed by the periodic task.commit()? > > On 2020/05/22 06:20:51, Chris Egerton wrote: > > Hi all, > > > > I know it's a busy time with the upcoming 2.6 release and I don't expect > > this to get a lot of traction until that's done, but I've published a KIP > > for allowing atomic commit of offsets and records for source connectors > and > > would appreciate your feedback: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > > > This feature should make it possible to implement source connectors with > > exactly-once delivery guarantees, and even allow a wide range of existing > > source connectors to provide exactly-once delivery guarantees with no > > changes required. > > > > Cheers, > > > > Chris > > >
Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hello Chris, That is an interesting KIP. I have a couple of questions: (1) in section of pseudo-code, what if the failure happens between 4(b) and 5(a), meaning after the producer commit the transaction, and before task.commitRecord(). (2) in section "source task life time", what is the difference between "commit offset" and "offsets to commit"? Given that the offset storage can be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could only produce to a kafka topic, are / is the topic(s) the same ? (the topic that producer writes offsets to and the topic task.commit() to) (3) for JDBC source task, it relies on `context.offsetStorageReader()` (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140) to retrieve the previously committed offset (if from a fresh start or resume from failure). so it seems that the single-source-of-truth of where to consume from last known / committed position stored in offset storage (e.g. kafka topic) managed by the periodic task.commit()? On 2020/05/22 06:20:51, Chris Egerton wrote: > Hi all, > > I know it's a busy time with the upcoming 2.6 release and I don't expect > this to get a lot of traction until that's done, but I've published a KIP > for allowing atomic commit of offsets and records for source connectors and > would appreciate your feedback: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > This feature should make it possible to implement source connectors with > exactly-once delivery guarantees, and even allow a wide range of existing > source connectors to provide exactly-once delivery guarantees with no > changes required. > > Cheers, > > Chris >
[DISCUSS] KIP-618: Atomic commit of source connector records and offsets
Hi all, I know it's a busy time with the upcoming 2.6 release and I don't expect this to get a lot of traction until that's done, but I've published a KIP for allowing atomic commit of offsets and records for source connectors and would appreciate your feedback: https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets This feature should make it possible to implement source connectors with exactly-once delivery guarantees, and even allow a wide range of existing source connectors to provide exactly-once delivery guarantees with no changes required. Cheers, Chris