Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-06-09 Thread Chris Egerton
Hi Randall,

That's a fair assessment; if a user upgrades their cluster to 3.0 with no
changes to worker or connector configs, it's possible that their cluster
will break if their worker principal(s) lack the necessary ACLs on the
Kafka cluster that hosts the config topic.

If we wanted to take a more conservative approach, we could allow users to
opt in to the use of a transactional producer by their cluster's leader
through some worker configuration property. The rolling upgrade process
from some pre-3.0 cluster to a 3.0+ cluster with exactly-once source
support enabled would become:

1. Upgrade cluster to 3.0 (or a later version, if one is available)
2. Enable the use of a transactional producer by the cluster's leader
3. Enable exactly-once source support

Since steps 1 and 2 could take place within the same rolling upgrade, the
number of rolling upgrades for this new approach would be the same as the
current approach: two. The only downside would be additional configuration
complexity for the worker, and the upgrade process itself would be a little
trickier for users (and potentially more error-prone).

In order to reduce the added configuration complexity as much as possible,
we could expose this intermediate state (workers are on 3.0 and the leader
uses a transactional producer, but exactly-once source support is not
enabled) by renaming the "exactly.once.source.enabled" property to
"exactly.once.source.support", and permitting values of "disabled"
(default), "preparing", and "enabled". The "preparing" and "enabled" values
would provide the same behavior as the current proposal with
"exactly.once.source.enabled" set to "false" and "true", respectively, and
"disabled" would have the same behavior as the current proposal, except
without the use of a transactional producer by the leader.

I'll update the proposal with this new behavior shortly. Thanks for the
review!

Cheers,

Chris

On Wed, Jun 9, 2021 at 1:02 PM Randall Hauch  wrote:

> Chris,
>
> Sorry for the late question/comment. But the "Breaking Changes" concerns
> me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when
> they restart their 3.0 worker(s) the workers will fail due to this producer
> requirement even if they make no changes to their worker configs or
> connector configs. Is this correct?
>
> If so, I'm concerned about this. Even though the additional producer ACLs
> are seemingly minor and easy to change, it is likely that users will not
> read the docs before they upgrade, causing their simple upgrade to fail.
> And even though in 3.0 we could allow ourselves to cause breaking changes
> with a major release, I personally would prefer we not have any such
> breaking changes.
>
> Given that, what would be required for us to eliminate that breaking
> change, or change it from a breaking change to a prerequisite for enabling
> EOS support in their cluster?
>
> Thanks,
>
> Randall
>
> On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton 
> wrote:
>
> > Hi Tom,
> >
> > I do agree that it'd be safer to default to "required", but since at the
> > time of the 3.0 release no existing connectors will have implemented the
> > "SourceConnector::exactlyOnceSupport" method, it'd require all Connect
> > users to downgrade to "requested" anyways in order to enable exactly-once
> > support on their workers. The friction there seems a little excessive; we
> > might consider changing the default from "requested" to "required" later
> on
> > down the line after connector developers have had enough time to put out
> > new connector versions that implement the new API. Thoughts?
> >
> > Cheers,
> >
> > Chris
> >
> > On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley  wrote:
> >
> > > Hi Chris,
> > >
> > > Just a minor question: I can see why the default for
> exactly.once.support
> > > is requested (you want a good first-run experience, I assume), but
> it's a
> > > little like engineering a safety catch and then not enabling it.
> Wouldn't
> > > it be safer to default to required, so that there's no way someone can
> > > mistakenly not get EoS without explicitly having configured it?
> > >
> > > Thanks,
> > >
> > > Tom
> > >
> > > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton
>  > >
> > > wrote:
> > >
> > > > Hi Gunnar,
> > > >
> > > > Thanks for taking a look! I've addressed the low-hanging fruit in the
> > > KIP;
> > > > responses to other comments inline here:
> > > >
> > > > > * TransactionContext: What's the use case for the methods
> accepting a
> > > > source record (commitTransaction(SourceRecord
> > > > record), abortTransaction(SourceRecord record))?
> > > >
> > > > This allows developers to decouple transaction boundaries from record
> > > > batches. If a connector has a configuration that dictates how often
> it
> > > > returns from "SourceTask::poll", for example, it may be easier to
> > define
> > > > multiple transactions within a single batch or a single transaction
> > > across
> > > > several batches than to retrofit the connector

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-06-09 Thread Randall Hauch
Chris,

Sorry for the late question/comment. But the "Breaking Changes" concerns
me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when
they restart their 3.0 worker(s) the workers will fail due to this producer
requirement even if they make no changes to their worker configs or
connector configs. Is this correct?

If so, I'm concerned about this. Even though the additional producer ACLs
are seemingly minor and easy to change, it is likely that users will not
read the docs before they upgrade, causing their simple upgrade to fail.
And even though in 3.0 we could allow ourselves to cause breaking changes
with a major release, I personally would prefer we not have any such
breaking changes.

Given that, what would be required for us to eliminate that breaking
change, or change it from a breaking change to a prerequisite for enabling
EOS support in their cluster?

Thanks,

Randall

On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton 
wrote:

> Hi Tom,
>
> I do agree that it'd be safer to default to "required", but since at the
> time of the 3.0 release no existing connectors will have implemented the
> "SourceConnector::exactlyOnceSupport" method, it'd require all Connect
> users to downgrade to "requested" anyways in order to enable exactly-once
> support on their workers. The friction there seems a little excessive; we
> might consider changing the default from "requested" to "required" later on
> down the line after connector developers have had enough time to put out
> new connector versions that implement the new API. Thoughts?
>
> Cheers,
>
> Chris
>
> On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley  wrote:
>
> > Hi Chris,
> >
> > Just a minor question: I can see why the default for exactly.once.support
> > is requested (you want a good first-run experience, I assume), but it's a
> > little like engineering a safety catch and then not enabling it. Wouldn't
> > it be safer to default to required, so that there's no way someone can
> > mistakenly not get EoS without explicitly having configured it?
> >
> > Thanks,
> >
> > Tom
> >
> > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton  >
> > wrote:
> >
> > > Hi Gunnar,
> > >
> > > Thanks for taking a look! I've addressed the low-hanging fruit in the
> > KIP;
> > > responses to other comments inline here:
> > >
> > > > * TransactionContext: What's the use case for the methods accepting a
> > > source record (commitTransaction(SourceRecord
> > > record), abortTransaction(SourceRecord record))?
> > >
> > > This allows developers to decouple transaction boundaries from record
> > > batches. If a connector has a configuration that dictates how often it
> > > returns from "SourceTask::poll", for example, it may be easier to
> define
> > > multiple transactions within a single batch or a single transaction
> > across
> > > several batches than to retrofit the connector's poll logic to work
> with
> > > transaction boundaries.
> > >
> > > > * SourceTaskContext: Instead of guarding against NSME, is there a way
> > for
> > > a
> > > connector to query the KC version and thus derive its capabilities?
> Going
> > > forward, a generic API for querying capabilities could be nice, so a
> > > connector can query for capabilities of the runtime in a safe and
> > > compatible way.
> > >
> > > This would be a great quality-of-life improvement for connector and
> > > framework developers alike, but I think it may be best left for a
> > separate
> > > KIP. The current approach, clunky though it may be, seems like a
> nuisance
> > > at worst. It's definitely worth addressing but I'm not sure we have the
> > > time to think through all the details thoroughly enough in time for the
> > > upcoming KIP freeze.
> > >
> > > > * SourceConnector: Would it make sense to merge the two methods
> perhaps
> > > and
> > > return one enum of { SUPPORTED, NOT_SUPPORTED,
> SUPPORTED_WITH_BOUNDARIES
> > }?
> > >
> > > Hmm... at first glance I like the idea of merging the two methods a
> lot.
> > > The one thing that gives me pause is that there may be connectors that
> > > would like to define their own transaction boundaries without providing
> > > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> > > accommodate that, but then, it might actually be simpler to keep the
> two
> > > methods separate in case we add some third variable to the mix that
> would
> > > also have to be reflected in the possible ExactlyOnceSupport enum
> values.
> > >
> > > > Or, alternatively return an enum from
> canDefineTransactionBoundaries(),
> > > too; even if it only has two values now, that'd allow for extension in
> > the
> > > future
> > >
> > > This is fine by me; we just have to figure out exactly which enum
> values
> > > would be suitable. It's a little clunky but right now I'm toying with
> > > something like "ConnectorDefinedTransactionBoundaries" with values of
> > > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
> > > need more granularity in the future t

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-06-02 Thread Chris Egerton
Hi Tom,

I do agree that it'd be safer to default to "required", but since at the
time of the 3.0 release no existing connectors will have implemented the
"SourceConnector::exactlyOnceSupport" method, it'd require all Connect
users to downgrade to "requested" anyways in order to enable exactly-once
support on their workers. The friction there seems a little excessive; we
might consider changing the default from "requested" to "required" later on
down the line after connector developers have had enough time to put out
new connector versions that implement the new API. Thoughts?

Cheers,

Chris

On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley  wrote:

> Hi Chris,
>
> Just a minor question: I can see why the default for exactly.once.support
> is requested (you want a good first-run experience, I assume), but it's a
> little like engineering a safety catch and then not enabling it. Wouldn't
> it be safer to default to required, so that there's no way someone can
> mistakenly not get EoS without explicitly having configured it?
>
> Thanks,
>
> Tom
>
> On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton 
> wrote:
>
> > Hi Gunnar,
> >
> > Thanks for taking a look! I've addressed the low-hanging fruit in the
> KIP;
> > responses to other comments inline here:
> >
> > > * TransactionContext: What's the use case for the methods accepting a
> > source record (commitTransaction(SourceRecord
> > record), abortTransaction(SourceRecord record))?
> >
> > This allows developers to decouple transaction boundaries from record
> > batches. If a connector has a configuration that dictates how often it
> > returns from "SourceTask::poll", for example, it may be easier to define
> > multiple transactions within a single batch or a single transaction
> across
> > several batches than to retrofit the connector's poll logic to work with
> > transaction boundaries.
> >
> > > * SourceTaskContext: Instead of guarding against NSME, is there a way
> for
> > a
> > connector to query the KC version and thus derive its capabilities? Going
> > forward, a generic API for querying capabilities could be nice, so a
> > connector can query for capabilities of the runtime in a safe and
> > compatible way.
> >
> > This would be a great quality-of-life improvement for connector and
> > framework developers alike, but I think it may be best left for a
> separate
> > KIP. The current approach, clunky though it may be, seems like a nuisance
> > at worst. It's definitely worth addressing but I'm not sure we have the
> > time to think through all the details thoroughly enough in time for the
> > upcoming KIP freeze.
> >
> > > * SourceConnector: Would it make sense to merge the two methods perhaps
> > and
> > return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES
> }?
> >
> > Hmm... at first glance I like the idea of merging the two methods a lot.
> > The one thing that gives me pause is that there may be connectors that
> > would like to define their own transaction boundaries without providing
> > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> > accommodate that, but then, it might actually be simpler to keep the two
> > methods separate in case we add some third variable to the mix that would
> > also have to be reflected in the possible ExactlyOnceSupport enum values.
> >
> > > Or, alternatively return an enum from canDefineTransactionBoundaries(),
> > too; even if it only has two values now, that'd allow for extension in
> the
> > future
> >
> > This is fine by me; we just have to figure out exactly which enum values
> > would be suitable. It's a little clunky but right now I'm toying with
> > something like "ConnectorDefinedTransactionBoundaries" with values of
> > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
> > need more granularity in the future then we can deprecate one or both of
> > them and add new values. Thoughts?
> >
> > > And one general question: in Debezium, we have some connectors that
> > produce
> > records "out-of-bands" to a schema history topic via their own custom
> > producer. Is there any way envisionable where such a producer would
> > participate in the transaction managed by the KC runtime environment?
> >
> > To answer the question exactly as asked: no; transactions cannot be
> shared
> > across producers and until/unless that is changed (which seems unlikely)
> > this won't be possible. However, I'm curious why a source connector would
> > spin up its own producer instead of using "SourceTask::poll" to provide
> > records to Connect. Is it easier to consume from that topic when the
> > connector can define its own (de)serialization format? I'm optimistic
> that
> > if we understand the use case for the separate producer we may still be
> > able to help bridge the gap here, one way or another.
> >
> > > One follow-up question after thinking some more about this; is there
> any
> > limit in terms of duration or size of in-flight, connector-controlled
> > transactions?

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-06-02 Thread Tom Bentley
Hi Chris,

Just a minor question: I can see why the default for exactly.once.support
is requested (you want a good first-run experience, I assume), but it's a
little like engineering a safety catch and then not enabling it. Wouldn't
it be safer to default to required, so that there's no way someone can
mistakenly not get EoS without explicitly having configured it?

Thanks,

Tom

On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton 
wrote:

> Hi Gunnar,
>
> Thanks for taking a look! I've addressed the low-hanging fruit in the KIP;
> responses to other comments inline here:
>
> > * TransactionContext: What's the use case for the methods accepting a
> source record (commitTransaction(SourceRecord
> record), abortTransaction(SourceRecord record))?
>
> This allows developers to decouple transaction boundaries from record
> batches. If a connector has a configuration that dictates how often it
> returns from "SourceTask::poll", for example, it may be easier to define
> multiple transactions within a single batch or a single transaction across
> several batches than to retrofit the connector's poll logic to work with
> transaction boundaries.
>
> > * SourceTaskContext: Instead of guarding against NSME, is there a way for
> a
> connector to query the KC version and thus derive its capabilities? Going
> forward, a generic API for querying capabilities could be nice, so a
> connector can query for capabilities of the runtime in a safe and
> compatible way.
>
> This would be a great quality-of-life improvement for connector and
> framework developers alike, but I think it may be best left for a separate
> KIP. The current approach, clunky though it may be, seems like a nuisance
> at worst. It's definitely worth addressing but I'm not sure we have the
> time to think through all the details thoroughly enough in time for the
> upcoming KIP freeze.
>
> > * SourceConnector: Would it make sense to merge the two methods perhaps
> and
> return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }?
>
> Hmm... at first glance I like the idea of merging the two methods a lot.
> The one thing that gives me pause is that there may be connectors that
> would like to define their own transaction boundaries without providing
> exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> accommodate that, but then, it might actually be simpler to keep the two
> methods separate in case we add some third variable to the mix that would
> also have to be reflected in the possible ExactlyOnceSupport enum values.
>
> > Or, alternatively return an enum from canDefineTransactionBoundaries(),
> too; even if it only has two values now, that'd allow for extension in the
> future
>
> This is fine by me; we just have to figure out exactly which enum values
> would be suitable. It's a little clunky but right now I'm toying with
> something like "ConnectorDefinedTransactionBoundaries" with values of
> "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
> need more granularity in the future then we can deprecate one or both of
> them and add new values. Thoughts?
>
> > And one general question: in Debezium, we have some connectors that
> produce
> records "out-of-bands" to a schema history topic via their own custom
> producer. Is there any way envisionable where such a producer would
> participate in the transaction managed by the KC runtime environment?
>
> To answer the question exactly as asked: no; transactions cannot be shared
> across producers and until/unless that is changed (which seems unlikely)
> this won't be possible. However, I'm curious why a source connector would
> spin up its own producer instead of using "SourceTask::poll" to provide
> records to Connect. Is it easier to consume from that topic when the
> connector can define its own (de)serialization format? I'm optimistic that
> if we understand the use case for the separate producer we may still be
> able to help bridge the gap here, one way or another.
>
> > One follow-up question after thinking some more about this; is there any
> limit in terms of duration or size of in-flight, connector-controlled
> transactions? In case of Debezium for instance, there may be cases where we
> tail the TX log from an upstream source database, not knowing whether the
> events we receive belong to a committed or aborted transaction. Would it be
> valid to emit all these events via a transactional task, and in case we
> receive a ROLLBACK event eventually, to abort the pending Kafka
> transaction? Such source transactions could be running for a long time
> potentially, e.g. hours or days (at least in theory). Or would this sort of
> usage not be considered a reasonable one?
>
> I think the distinction between reasonable and unreasonable usage here is
> likely dependent on use cases that people are trying to satisfy with their
> connector, but if I had to guess, I'd say that a different approach is
> probably warranted in most cases if the transaction span

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-06-01 Thread Chris Egerton
Hi Gunnar,

Thanks for taking a look! I've addressed the low-hanging fruit in the KIP;
responses to other comments inline here:

> * TransactionContext: What's the use case for the methods accepting a
source record (commitTransaction(SourceRecord
record), abortTransaction(SourceRecord record))?

This allows developers to decouple transaction boundaries from record
batches. If a connector has a configuration that dictates how often it
returns from "SourceTask::poll", for example, it may be easier to define
multiple transactions within a single batch or a single transaction across
several batches than to retrofit the connector's poll logic to work with
transaction boundaries.

> * SourceTaskContext: Instead of guarding against NSME, is there a way for
a
connector to query the KC version and thus derive its capabilities? Going
forward, a generic API for querying capabilities could be nice, so a
connector can query for capabilities of the runtime in a safe and
compatible way.

This would be a great quality-of-life improvement for connector and
framework developers alike, but I think it may be best left for a separate
KIP. The current approach, clunky though it may be, seems like a nuisance
at worst. It's definitely worth addressing but I'm not sure we have the
time to think through all the details thoroughly enough in time for the
upcoming KIP freeze.

> * SourceConnector: Would it make sense to merge the two methods perhaps
and
return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }?

Hmm... at first glance I like the idea of merging the two methods a lot.
The one thing that gives me pause is that there may be connectors that
would like to define their own transaction boundaries without providing
exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
accommodate that, but then, it might actually be simpler to keep the two
methods separate in case we add some third variable to the mix that would
also have to be reflected in the possible ExactlyOnceSupport enum values.

> Or, alternatively return an enum from canDefineTransactionBoundaries(),
too; even if it only has two values now, that'd allow for extension in the
future

This is fine by me; we just have to figure out exactly which enum values
would be suitable. It's a little clunky but right now I'm toying with
something like "ConnectorDefinedTransactionBoundaries" with values of
"SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
need more granularity in the future then we can deprecate one or both of
them and add new values. Thoughts?

> And one general question: in Debezium, we have some connectors that
produce
records "out-of-bands" to a schema history topic via their own custom
producer. Is there any way envisionable where such a producer would
participate in the transaction managed by the KC runtime environment?

To answer the question exactly as asked: no; transactions cannot be shared
across producers and until/unless that is changed (which seems unlikely)
this won't be possible. However, I'm curious why a source connector would
spin up its own producer instead of using "SourceTask::poll" to provide
records to Connect. Is it easier to consume from that topic when the
connector can define its own (de)serialization format? I'm optimistic that
if we understand the use case for the separate producer we may still be
able to help bridge the gap here, one way or another.

> One follow-up question after thinking some more about this; is there any
limit in terms of duration or size of in-flight, connector-controlled
transactions? In case of Debezium for instance, there may be cases where we
tail the TX log from an upstream source database, not knowing whether the
events we receive belong to a committed or aborted transaction. Would it be
valid to emit all these events via a transactional task, and in case we
receive a ROLLBACK event eventually, to abort the pending Kafka
transaction? Such source transactions could be running for a long time
potentially, e.g. hours or days (at least in theory). Or would this sort of
usage not be considered a reasonable one?

I think the distinction between reasonable and unreasonable usage here is
likely dependent on use cases that people are trying to satisfy with their
connector, but if I had to guess, I'd say that a different approach is
probably warranted in most cases if the transaction spans across entire
days at a time. If there's no concern about data not being visible to
downstream consumers until its transaction is committed, and the number of
records in the transaction isn't so large that the amount of memory
required to buffer them all locally on a consumer before delivering them to
the downstream application is reasonable, it would technically be possible
though. Connect users would have to be mindful of the following:

- A separate offsets topic for the connector would be highly recommended in
order to avoid crippling other connectors with hanging transactions

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-27 Thread Gunnar Morling
Chris,

One follow-up question after thinking some more about this; is there any
limit in terms of duration or size of in-flight, connector-controlled
transactions? In case of Debezium for instance, there may be cases where we
tail the TX log from an upstream source database, not knowing whether the
events we receive belong to a committed or aborted transaction. Would it be
valid to emit all these events via a transactional task, and in case we
receive a ROLLBACK event eventually, to abort the pending Kafka
transaction? Such source transactions could be running for a long time
potentially, e.g. hours or days (at least in theory). Or would this sort of
usage not be considered a reasonable one?

Thanks,

--Gunnar


Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
gunnar.morl...@googlemail.com>:

> Chris, all,
>
> I've just read KIP-618, and let me congratulate you first of all for this
> impressive piece of work! Here's a few small suggestions and questions I
> had while reading:
>
> * TransactionContext: What's the use case for the methods accepting a
> source record (commitTransaction(SourceRecord
> record), abortTransaction(SourceRecord record))?
> * SourceTaskContext: Typo in "when the sink connector is deployed" ->
> source task
> * SourceTaskContext: Instead of guarding against NSME, is there a way for
> a connector to query the KC version and thus derive its capabilities? Going
> forward, a generic API for querying capabilities could be nice, so a
> connector can query for capabilities of the runtime in a safe and
> compatible way.
> * SourceConnector: exactlyOnceSupport() -> false return value doesn't match
> * SourceConnector: Would it make sense to merge the two methods perhaps
> and return one enum of { SUPPORTED, NOT_SUPPORTED,
> SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> from canDefineTransactionBoundaries(), too; even if it only has two values
> now, that'd allow for extension in the future
>
> And one general question: in Debezium, we have some connectors that
> produce records "out-of-bands" to a schema history topic via their own
> custom producer. Is there any way envisionable where such a producer would
> participate in the transaction managed by the KC runtime environment?
>
> Thanks a lot,
>
> --Gunnar
>
>
> Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> :
>
>> Hi all,
>>
>> Wanted to note here that I've updated the KIP document to include the
>> changes discussed recently. They're mostly located in the "Public
>> Interfaces" section. I suspect discussion hasn't concluded yet and there
>> will probably be a few more changes to come, but wanted to take the
>> opportunity to provide a snapshot of what the current design looks like.
>>
>> Cheers,
>>
>> Chris
>>
>> On Fri, May 21, 2021 at 4:32 PM Chris Egerton 
>> wrote:
>>
>> > Hi Tom,
>> >
>> > Wow, I was way off base! I was thinking that the intent of the fencible
>> > producer was to employ it by default with 3.0, as opposed to only after
>> the
>> > worker-level
>> > "exactly.once.source.enabled" property was flipped on. You are correct
>> > that with the case you were actually describing, there would be no
>> > heightened ACL requirements, and that it would leave room in the future
>> for
>> > exactly-once to be disabled on a per-connector basis (as long as all the
>> > workers in the cluster already had "exactly.once.source.enabled" set to
>> > "true") with no worries about breaking changes.
>> >
>> > I agree that this is something for another KIP; even if we could squeeze
>> > it in in time for this release, it might be a bit much for new users to
>> > take in all at once. But I can add it to the doc as "future work" since
>> > it's a promising idea that could prove valuable to someone who might
>> need
>> > per-connector granularity in the future.
>> >
>> > Thanks for clearing things up; in retrospect your comments make a lot
>> more
>> > sense now, and I hope I've sufficiently addressed them by now.
>> >
>> > PSA for you and everyone else--I plan on updating the doc next week with
>> > the new APIs for connector-defined transaction boundaries,
>> > user-configurable transaction boundaries (i.e., poll vs. interval vs.
>> > connectors), and preflight checks for exactly-once validation (required
>> vs.
>> > requested).
>> >
>> > Cheers,
>> >
>> > Chris
>> >
>> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley 
>> wrote:
>> >
>> >> Hi Chris,
>> >>
>> >> Thanks for continuing to entertain some of these ideas.
>> >>
>> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
>> > >> >
>> >> wrote:
>> >>
>> >> > [...]
>> >> >
>> >> That's true, but we do go from three static ACLs (write/describe on a
>> >> fixed
>> >> > transactional ID, and idempotent write on a fixed cluster) to a
>> dynamic
>> >> > collection of ACLs.
>> >> >
>> >>
>> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I was
>> >> suggesting the use of a 'fencing producer' only in clusters with
>> >> exactly.once.source.ena

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-27 Thread Gunnar Morling
Chris, all,

I've just read KIP-618, and let me congratulate you first of all for this
impressive piece of work! Here's a few small suggestions and questions I
had while reading:

* TransactionContext: What's the use case for the methods accepting a
source record (commitTransaction(SourceRecord
record), abortTransaction(SourceRecord record))?
* SourceTaskContext: Typo in "when the sink connector is deployed" ->
source task
* SourceTaskContext: Instead of guarding against NSME, is there a way for a
connector to query the KC version and thus derive its capabilities? Going
forward, a generic API for querying capabilities could be nice, so a
connector can query for capabilities of the runtime in a safe and
compatible way.
* SourceConnector: exactlyOnceSupport() -> false return value doesn't match
* SourceConnector: Would it make sense to merge the two methods perhaps and
return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }?
Or, alternatively return an enum from canDefineTransactionBoundaries(),
too; even if it only has two values now, that'd allow for extension in the
future

And one general question: in Debezium, we have some connectors that produce
records "out-of-bands" to a schema history topic via their own custom
producer. Is there any way envisionable where such a producer would
participate in the transaction managed by the KC runtime environment?

Thanks a lot,

--Gunnar


Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
:

> Hi all,
>
> Wanted to note here that I've updated the KIP document to include the
> changes discussed recently. They're mostly located in the "Public
> Interfaces" section. I suspect discussion hasn't concluded yet and there
> will probably be a few more changes to come, but wanted to take the
> opportunity to provide a snapshot of what the current design looks like.
>
> Cheers,
>
> Chris
>
> On Fri, May 21, 2021 at 4:32 PM Chris Egerton  wrote:
>
> > Hi Tom,
> >
> > Wow, I was way off base! I was thinking that the intent of the fencible
> > producer was to employ it by default with 3.0, as opposed to only after
> the
> > worker-level
> > "exactly.once.source.enabled" property was flipped on. You are correct
> > that with the case you were actually describing, there would be no
> > heightened ACL requirements, and that it would leave room in the future
> for
> > exactly-once to be disabled on a per-connector basis (as long as all the
> > workers in the cluster already had "exactly.once.source.enabled" set to
> > "true") with no worries about breaking changes.
> >
> > I agree that this is something for another KIP; even if we could squeeze
> > it in in time for this release, it might be a bit much for new users to
> > take in all at once. But I can add it to the doc as "future work" since
> > it's a promising idea that could prove valuable to someone who might need
> > per-connector granularity in the future.
> >
> > Thanks for clearing things up; in retrospect your comments make a lot
> more
> > sense now, and I hope I've sufficiently addressed them by now.
> >
> > PSA for you and everyone else--I plan on updating the doc next week with
> > the new APIs for connector-defined transaction boundaries,
> > user-configurable transaction boundaries (i.e., poll vs. interval vs.
> > connectors), and preflight checks for exactly-once validation (required
> vs.
> > requested).
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley  wrote:
> >
> >> Hi Chris,
> >>
> >> Thanks for continuing to entertain some of these ideas.
> >>
> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
>  >> >
> >> wrote:
> >>
> >> > [...]
> >> >
> >> That's true, but we do go from three static ACLs (write/describe on a
> >> fixed
> >> > transactional ID, and idempotent write on a fixed cluster) to a
> dynamic
> >> > collection of ACLs.
> >> >
> >>
> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I was
> >> suggesting the use of a 'fencing producer' only in clusters with
> >> exactly.once.source.enabled=true where I imagined the key difference
> >> between the exactly once and fencing cases was how the producer was
> >> configured/used (transactional vs this new fencing semantic). I think
> the
> >> ACL requirements for connector producer principals would therefore be
> the
> >> same as currently described in the KIP. The same is true for the worker
> >> principals (which is the only breaking change you give in the KIP). So I
> >> don't think the fencing idea changes the backwards compatibility story
> >> that's already in the KIP, just allows a safe per-connector
> >> exactly.once=disabled option to be supported (with required as requested
> >> as
> >> we already discussed).
> >>
> >> But I'm wondering whether I've overlooked something.
> >>
> >> Ultimately I think it may behoove us to err on the side of reducing the
> >> > breaking changes here for now and saving them for 4.0 (or some later
> >> major
> >> > release), but would be interes

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-24 Thread Chris Egerton
Hi all,

Wanted to note here that I've updated the KIP document to include the
changes discussed recently. They're mostly located in the "Public
Interfaces" section. I suspect discussion hasn't concluded yet and there
will probably be a few more changes to come, but wanted to take the
opportunity to provide a snapshot of what the current design looks like.

Cheers,

Chris

On Fri, May 21, 2021 at 4:32 PM Chris Egerton  wrote:

> Hi Tom,
>
> Wow, I was way off base! I was thinking that the intent of the fencible
> producer was to employ it by default with 3.0, as opposed to only after the
> worker-level
> "exactly.once.source.enabled" property was flipped on. You are correct
> that with the case you were actually describing, there would be no
> heightened ACL requirements, and that it would leave room in the future for
> exactly-once to be disabled on a per-connector basis (as long as all the
> workers in the cluster already had "exactly.once.source.enabled" set to
> "true") with no worries about breaking changes.
>
> I agree that this is something for another KIP; even if we could squeeze
> it in in time for this release, it might be a bit much for new users to
> take in all at once. But I can add it to the doc as "future work" since
> it's a promising idea that could prove valuable to someone who might need
> per-connector granularity in the future.
>
> Thanks for clearing things up; in retrospect your comments make a lot more
> sense now, and I hope I've sufficiently addressed them by now.
>
> PSA for you and everyone else--I plan on updating the doc next week with
> the new APIs for connector-defined transaction boundaries,
> user-configurable transaction boundaries (i.e., poll vs. interval vs.
> connectors), and preflight checks for exactly-once validation (required vs.
> requested).
>
> Cheers,
>
> Chris
>
> On Fri, May 21, 2021 at 7:14 AM Tom Bentley  wrote:
>
>> Hi Chris,
>>
>> Thanks for continuing to entertain some of these ideas.
>>
>> On Fri, May 14, 2021 at 5:06 PM Chris Egerton > >
>> wrote:
>>
>> > [...]
>> >
>> That's true, but we do go from three static ACLs (write/describe on a
>> fixed
>> > transactional ID, and idempotent write on a fixed cluster) to a dynamic
>> > collection of ACLs.
>> >
>>
>> I'm not quite sure I follow, maybe I've lost track. To be clear, I was
>> suggesting the use of a 'fencing producer' only in clusters with
>> exactly.once.source.enabled=true where I imagined the key difference
>> between the exactly once and fencing cases was how the producer was
>> configured/used (transactional vs this new fencing semantic). I think the
>> ACL requirements for connector producer principals would therefore be the
>> same as currently described in the KIP. The same is true for the worker
>> principals (which is the only breaking change you give in the KIP). So I
>> don't think the fencing idea changes the backwards compatibility story
>> that's already in the KIP, just allows a safe per-connector
>> exactly.once=disabled option to be supported (with required as requested
>> as
>> we already discussed).
>>
>> But I'm wondering whether I've overlooked something.
>>
>> Ultimately I think it may behoove us to err on the side of reducing the
>> > breaking changes here for now and saving them for 4.0 (or some later
>> major
>> > release), but would be interested in thoughts from you and others.
>> >
>>
>> Difficult to answer (given I think I might be missing something).
>> If there are breaking changes then I don't disagree. It's difficult to
>> reason about big changes like this without some practical experience.
>> If there are not, then I think we could also implement the whole
>> exactly.once=disabled thing in a later KIP without additional breaking
>> changes (i.e. some time in 3.x), right?
>>
>>
>> > > Gouzhang also has a (possible) use case for a fencing-only producer (
>> > https://issues.apache.org/jira/browse/KAFKA-12693), and as he points
>> out
>> > there, you should be able to get these semantics today by calling
>> > initTransactions() and then just using the producer as normal (no
>> > beginTransaction()/abortTransaction()/endTransaction()).
>> >
>> > I tested this locally and was not met with success; transactional
>> producers
>> > do a check right now to ensure that any calls to "KafkaProducer::send"
>> > occur within a transaction (see
>> >
>> >
>> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
>> > and
>> >
>> >
>> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
>> > ).
>> > Not a blocker, just noting that we'd have to do some legwork to make
>> this
>> > workable with the producer API.
>> >
>>
>> Ah, sorry, I should have actually tried it rather than just taking a quick
>> look at the code.
>>
>> Rather than remove those s

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-21 Thread Chris Egerton
Hi Tom,

Wow, I was way off base! I was thinking that the intent of the fencible
producer was to employ it by default with 3.0, as opposed to only after the
worker-level
"exactly.once.source.enabled" property was flipped on. You are correct that
with the case you were actually describing, there would be no heightened
ACL requirements, and that it would leave room in the future for
exactly-once to be disabled on a per-connector basis (as long as all the
workers in the cluster already had "exactly.once.source.enabled" set to
"true") with no worries about breaking changes.

I agree that this is something for another KIP; even if we could squeeze it
in in time for this release, it might be a bit much for new users to take
in all at once. But I can add it to the doc as "future work" since it's a
promising idea that could prove valuable to someone who might need
per-connector granularity in the future.

Thanks for clearing things up; in retrospect your comments make a lot more
sense now, and I hope I've sufficiently addressed them by now.

PSA for you and everyone else--I plan on updating the doc next week with
the new APIs for connector-defined transaction boundaries,
user-configurable transaction boundaries (i.e., poll vs. interval vs.
connectors), and preflight checks for exactly-once validation (required vs.
requested).

Cheers,

Chris

On Fri, May 21, 2021 at 7:14 AM Tom Bentley  wrote:

> Hi Chris,
>
> Thanks for continuing to entertain some of these ideas.
>
> On Fri, May 14, 2021 at 5:06 PM Chris Egerton  >
> wrote:
>
> > [...]
> >
> That's true, but we do go from three static ACLs (write/describe on a fixed
> > transactional ID, and idempotent write on a fixed cluster) to a dynamic
> > collection of ACLs.
> >
>
> I'm not quite sure I follow, maybe I've lost track. To be clear, I was
> suggesting the use of a 'fencing producer' only in clusters with
> exactly.once.source.enabled=true where I imagined the key difference
> between the exactly once and fencing cases was how the producer was
> configured/used (transactional vs this new fencing semantic). I think the
> ACL requirements for connector producer principals would therefore be the
> same as currently described in the KIP. The same is true for the worker
> principals (which is the only breaking change you give in the KIP). So I
> don't think the fencing idea changes the backwards compatibility story
> that's already in the KIP, just allows a safe per-connector
> exactly.once=disabled option to be supported (with required as requested as
> we already discussed).
>
> But I'm wondering whether I've overlooked something.
>
> Ultimately I think it may behoove us to err on the side of reducing the
> > breaking changes here for now and saving them for 4.0 (or some later
> major
> > release), but would be interested in thoughts from you and others.
> >
>
> Difficult to answer (given I think I might be missing something).
> If there are breaking changes then I don't disagree. It's difficult to
> reason about big changes like this without some practical experience.
> If there are not, then I think we could also implement the whole
> exactly.once=disabled thing in a later KIP without additional breaking
> changes (i.e. some time in 3.x), right?
>
>
> > > Gouzhang also has a (possible) use case for a fencing-only producer (
> > https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
> > there, you should be able to get these semantics today by calling
> > initTransactions() and then just using the producer as normal (no
> > beginTransaction()/abortTransaction()/endTransaction()).
> >
> > I tested this locally and was not met with success; transactional
> producers
> > do a check right now to ensure that any calls to "KafkaProducer::send"
> > occur within a transaction (see
> >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> > and
> >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> > ).
> > Not a blocker, just noting that we'd have to do some legwork to make this
> > workable with the producer API.
> >
>
> Ah, sorry, I should have actually tried it rather than just taking a quick
> look at the code.
>
> Rather than remove those safety checks I suppose we'd need a way of
> distinguishing, in the config, the difference in semantics. E.g. Something
> like a fencing.id config, which was mutually exclusive with
> transactional.id.
> Likewise perhaps initFencing() alongside initTransactions() in the API. But
> I think at this point it's something for another KIP.
>
> Kind regards,
>
> Tom
>


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-21 Thread Tom Bentley
Hi Chris,

Thanks for continuing to entertain some of these ideas.

On Fri, May 14, 2021 at 5:06 PM Chris Egerton 
wrote:

> [...]
>
That's true, but we do go from three static ACLs (write/describe on a fixed
> transactional ID, and idempotent write on a fixed cluster) to a dynamic
> collection of ACLs.
>

I'm not quite sure I follow, maybe I've lost track. To be clear, I was
suggesting the use of a 'fencing producer' only in clusters with
exactly.once.source.enabled=true where I imagined the key difference
between the exactly once and fencing cases was how the producer was
configured/used (transactional vs this new fencing semantic). I think the
ACL requirements for connector producer principals would therefore be the
same as currently described in the KIP. The same is true for the worker
principals (which is the only breaking change you give in the KIP). So I
don't think the fencing idea changes the backwards compatibility story
that's already in the KIP, just allows a safe per-connector
exactly.once=disabled option to be supported (with required as requested as
we already discussed).

But I'm wondering whether I've overlooked something.

Ultimately I think it may behoove us to err on the side of reducing the
> breaking changes here for now and saving them for 4.0 (or some later major
> release), but would be interested in thoughts from you and others.
>

Difficult to answer (given I think I might be missing something).
If there are breaking changes then I don't disagree. It's difficult to
reason about big changes like this without some practical experience.
If there are not, then I think we could also implement the whole
exactly.once=disabled thing in a later KIP without additional breaking
changes (i.e. some time in 3.x), right?


> > Gouzhang also has a (possible) use case for a fencing-only producer (
> https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
> there, you should be able to get these semantics today by calling
> initTransactions() and then just using the producer as normal (no
> beginTransaction()/abortTransaction()/endTransaction()).
>
> I tested this locally and was not met with success; transactional producers
> do a check right now to ensure that any calls to "KafkaProducer::send"
> occur within a transaction (see
>
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> and
>
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> ).
> Not a blocker, just noting that we'd have to do some legwork to make this
> workable with the producer API.
>

Ah, sorry, I should have actually tried it rather than just taking a quick
look at the code.

Rather than remove those safety checks I suppose we'd need a way of
distinguishing, in the config, the difference in semantics. E.g. Something
like a fencing.id config, which was mutually exclusive with transactional.id.
Likewise perhaps initFencing() alongside initTransactions() in the API. But
I think at this point it's something for another KIP.

Kind regards,

Tom


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-19 Thread Chris Egerton
Hey Jeremy,

Thanks for taking a look! Always nice to have input from connector
developers, especially ones as prolific as you.

I was hoping to leave connector-defined transaction boundaries for future
work as the use cases for them were unclear. For example, with transactions
in an upstream data source, if a connector developer wants to preserve
those transactions downstream, their connector can produce source records
representing those transaction boundaries, and allow users to then perform
their own filtering logic based on those records. Alternatively, the
connector can do that filtering itself by refusing to produce any records
until the upstream transaction from which they are derived is committed.

Given that it's pretty much a sure thing at this point that producer
transaction commits will not subdivide task-defined batches (where a batch
is the collection of records returned in a single call to
"SourceTask::put"), I wonder if the Spooldir connector could follow that
second approach: buffer all the records for a file and then provide them to
Connect all in one batch. And, if this is infeasible for large files, the
question then becomes--how would we expect downstream applications to
handle this? A large amount of the recent discussion here recently has been
centered around the problems posed by large transactions, both in terms of
increased latency and heightened memory requirements for consumers (which
would have to buffer all the records in that transaction locally until a
commit marker is encountered, on a per-topic-partition basis).

As far as heightened latency goes, if you need whole files written at once,
it seems like a reasonable tradeoff to make that probably won't ruffle a
lot of feathers (although your input here to validate that assumption would
be valuable).

With regards to memory requirements--I guess it's not out of the question
to suggest that users who truly need atomic transfer of entire files at a
time be equipped with the right hardware to be able to support it. In
downstream applications this is unavoidable until/unless broker-side
transaction filtering is implemented. However, if we allow
connector-defined transaction boundaries, connector tasks wouldn't need the
additional memory as they would be able to send records to Connect as they
read them, which would respond by dispatching them to a producer on the
fly. On the other hand, if tasks have to locally buffer records in order to
be able to return them all in a single batch, then we inflict that new,
possibly-painful memory requirement on Connect workers as well.


Okay, stream of consciousness over. I think I see a valid use case for
connector-defined transaction boundaries. That leaves us with the task of
designing an API for connector developers, and deciding on how to provide
this option to users (some may not need or want to respect the transaction
boundaries defined by their connector).


As far as the API goes, I think the approach you mentioned offline of using
the source task context makes sense for the most part, but would introduce
some small cross-compatibility issues where it would become more difficult
to run newer connectors on older versions of Connect. Instead, we could
take a page out of KIP-610's book and do something like this:

public interface SourceTaskContext {
// Existing methods and fields omitted in this snippet

public TransactionContext transactionContext();
}

A new TransactionContext interface would be introduced. Tasks can grab an
instance of it on startup from the source task context and use it to
communicate transaction boundaries to the worker over the course of their
lifetime. If the worker is running an older version of Connect that doesn't
support this API, they'll be met with a classloading error during the
initial call to "transactionContext()", and will be able to deduce from it
that they won't be able to define their own transaction boundaries. The
benefit is that, if this is all done on task startup, the classloading
error will only have to be caught once, instead of every time any of the
transaction-related methods are invoked.

For the TransactionContext interface:

public interface TransactionContext {
public void commitTransaction();

public void commitTransactionOn(SourceRecord record);

public void abortTransaction();

public void abortTransactionOn(SourceRecord record);
}

There would be no API for defining when a transaction begins, since we can
assume that transactions begin when the first record is provided and
immediately after every committed or aborted transaction. The method
variants that accept SourceRecord instances would allow connectors to
completely decouple record batches from transaction boundaries; they could
produce single record batches with multiple transactions inside them, or
define transactions that start in the middle of one batch and end in the
middle of another. Tasks can be made aware of when transactions are
completed (eith

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-19 Thread Jeremy Custenborder
Hey Chris!

Nice work on this KIP!

What are the thoughts about letting the connector developer control
the boundaries of the transaction? For example kafka-connect-spooldir
is used to parse and import files to kafka. It would be amazing if I
could begin and end the transaction as I open and close files. This
would allow me to guard from the dreaded exception a line 732 and wrap
an entire file in a transaction. It either makes it or it doesn't.

J


On Tue, May 18, 2021 at 10:04 AM Chris Egerton
 wrote:
>
> Hi Randall,
>
> Thanks for clarifying the issues with large transactions. I think we're
> starting to converge on an understanding of the problem space, if not quite
> on an approach to tackle it.
>
> I think my biggest issue with defining transaction boundaries on a
> per-task-poll basis are these points that you make:
>
> > First, it reuses something that connector implementations
> already employ, and which many connectors have configs to tune the behavior
> to maximize throughput. Second, it requires no additional configuration
> properties or connector-specific behavior. Third, high-throughput
> connectors are likely returning large batches, so write amplification for
> the extra offset record per source partition is likely to have a much
> smaller impact (assuming that most high throughput connectors have high
> records-per-source-partition ratios).
>
> I don't believe the first and third points are safe assumptions to make.
> There's actually very little, if any, performance benefit to writing source
> connectors whose tasks give Connect larger batches (at least, not as far as
> the framework logic goes). Every record is sequentially transformed,
> converted, and dispatched to the producer, regardless of whether it came
> from a batch of one or one million. So Connect does have the capability
> right now to support high-throughput, small-batch connectors.
>
> For a concrete example, Confluent's Datagen connector (
> https://github.com/confluentinc/kafka-connect-datagen), which is used to
> produce sample data for quickstarts and demos, performs just fine even
> though it does absolutely no batching whatsoever and returns only a single
> record per call to "SourceTask::Poll". In some not-super-rigorous
> performance testing, I ran the connector three times against a local build
> of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect
> to perform a producer flush for every task-provided batch of records, then
> ran the connector three times against that. Each test run produced exactly
> one million records. The worst run out of the initial cases (against the
> unmodified local 2.8.0 build) took 15 seconds to complete. The best run out
> of the subsequent cases (against the modified local 2.8.0 build) took 2
> minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was
> against a single local broker with no replication factor.
>
> Of course, if it's a matter of accommodating this one (demo-oriented)
> connector, then it might still be worth it to put the onus on the
> developers of that connector to modify it appropriately to work with
> exactly-once support. But my point with this connector is more
> general--simply put, there don't appear to be safe grounds for the
> assumption that source tasks must produce large batches in order to achieve
> high throughput.
>
> > Yes, per-connector offset commit intervals is one approach that would be
> more explicit, though see my concerns earlier in this email about
> controlling the number of records in a transaction by a time-based config,
> even if set on a per-connector basis.
>
> I'll try to summarize the concerns presented; let me know if I'm missing
> something:
>
> 1. Large transaction sizes can inflate the memory requirements for
> consumers and possibly even overwhelm them with out-of-memory errors.
> 2. High offset commit intervals can inflate the read latency of downstream
> applications.
>
> Combining both of these concerns with the high-throughput-small-batch
> scenario, it still seems worthwhile to provide users with a way to do some
> multi-batch transactions for their source tasks. This is analogous to
> consumer-side buffering; yes, by default you probably want events to be
> available downstream as soon as possible, but in some cases it's still
> necessary to introduce a small hit in latency in order to keep up with high
> throughput. And if each batch is relatively small, then heightened memory
> requirements for consumers shouldn't really be a problem.
>
> And for other scenarios, with per-connector offset commit intervals, users
> can always just set the interval to zero to get exactly the behavior that
> you're describing.
>
> > I'm not sure that setting up a separate Connect cluster is that practical
> for many Connect users, especially in larger organizations where one group
> manages the cluster and others manage connectors.
>
> I agree; I was mostly responding to what I perceived as the assumptio

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-18 Thread Chris Egerton
Hi Randall,

Thanks for clarifying the issues with large transactions. I think we're
starting to converge on an understanding of the problem space, if not quite
on an approach to tackle it.

I think my biggest issue with defining transaction boundaries on a
per-task-poll basis are these points that you make:

> First, it reuses something that connector implementations
already employ, and which many connectors have configs to tune the behavior
to maximize throughput. Second, it requires no additional configuration
properties or connector-specific behavior. Third, high-throughput
connectors are likely returning large batches, so write amplification for
the extra offset record per source partition is likely to have a much
smaller impact (assuming that most high throughput connectors have high
records-per-source-partition ratios).

I don't believe the first and third points are safe assumptions to make.
There's actually very little, if any, performance benefit to writing source
connectors whose tasks give Connect larger batches (at least, not as far as
the framework logic goes). Every record is sequentially transformed,
converted, and dispatched to the producer, regardless of whether it came
from a batch of one or one million. So Connect does have the capability
right now to support high-throughput, small-batch connectors.

For a concrete example, Confluent's Datagen connector (
https://github.com/confluentinc/kafka-connect-datagen), which is used to
produce sample data for quickstarts and demos, performs just fine even
though it does absolutely no batching whatsoever and returns only a single
record per call to "SourceTask::Poll". In some not-super-rigorous
performance testing, I ran the connector three times against a local build
of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect
to perform a producer flush for every task-provided batch of records, then
ran the connector three times against that. Each test run produced exactly
one million records. The worst run out of the initial cases (against the
unmodified local 2.8.0 build) took 15 seconds to complete. The best run out
of the subsequent cases (against the modified local 2.8.0 build) took 2
minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was
against a single local broker with no replication factor.

Of course, if it's a matter of accommodating this one (demo-oriented)
connector, then it might still be worth it to put the onus on the
developers of that connector to modify it appropriately to work with
exactly-once support. But my point with this connector is more
general--simply put, there don't appear to be safe grounds for the
assumption that source tasks must produce large batches in order to achieve
high throughput.

> Yes, per-connector offset commit intervals is one approach that would be
more explicit, though see my concerns earlier in this email about
controlling the number of records in a transaction by a time-based config,
even if set on a per-connector basis.

I'll try to summarize the concerns presented; let me know if I'm missing
something:

1. Large transaction sizes can inflate the memory requirements for
consumers and possibly even overwhelm them with out-of-memory errors.
2. High offset commit intervals can inflate the read latency of downstream
applications.

Combining both of these concerns with the high-throughput-small-batch
scenario, it still seems worthwhile to provide users with a way to do some
multi-batch transactions for their source tasks. This is analogous to
consumer-side buffering; yes, by default you probably want events to be
available downstream as soon as possible, but in some cases it's still
necessary to introduce a small hit in latency in order to keep up with high
throughput. And if each batch is relatively small, then heightened memory
requirements for consumers shouldn't really be a problem.

And for other scenarios, with per-connector offset commit intervals, users
can always just set the interval to zero to get exactly the behavior that
you're describing.

> I'm not sure that setting up a separate Connect cluster is that practical
for many Connect users, especially in larger organizations where one group
manages the cluster and others manage connectors.

I agree; I was mostly responding to what I perceived as the assumption that
exactly-once support could be configured on a per-connector basis, which
hopefully we've clarified by now is not the case.

> Reducing throughput is a technical solution, especially if the only
alternative is that the task doesn't run. But IMO it's an impractical
technical solution as most users will want a real-time solution, and
decreasing connector's throughput is the opposite of this.

Are you proposing that this option not be provided to users? I'm afraid if
we don't do that then it'll just become an undocumented workaround that a
handful of people are aware of (or, worse yet, have to discover for
themselves), but most people aren't, even

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-17 Thread Randall Hauch
On Mon, May 10, 2021 at 10:26 AM Chris Egerton 
wrote:

> RE transaction boundaries:
>
> > First, the offset commit interval is currently 60 seconds by default, and
> source connectors can produce a lot of records in that time. Until someone
> operating a Connect cluster changed that to a much lower value, it's
> possible that source connector performance could be significantly impacted.
> ...
> > At first blush,
> controlling transaction boundaries based upon batches seems like it would
> work better for high throughput connectors.
>
> Sorry, what's the concern about performance here? Write throughput for a
> transactional producer doesn't suffer as the number of records per
> transaction increases. I'm also worried about source connectors that return
> small batches with high frequency; those would not fare well with a lower
> offset commit interval.
>

Recall that with Kafka transactions, a consumer in read_committed mode
buffers messages that are part of a transaction until that transaction is
committed or aborted. When a consumer sees an aborted transaction marker
for that transaction, the consumer discards all buffered messages
associated with that transaction. When a consumer sees a commit transaction
marker, it then forwards those buffered messages that are associated with
that transaction to its application. Note the transaction markers and
buffered messages are per topic partition.

So I have two concerns with using time-based transaction boundaries.

The first is easier to understand: consumers using read_committed mode may
consume a lot of memory while buffering records that are part of active
transactions. In fact, the memory required is a function of the number of
records in the transaction in the assigned topic partitions. However, the
only way a Connect user can control the number of records in each
transaction is by reducing the offset commit interval used for _all
connectors_. This means that the Connect user cannot directly limit the
size of the transaction (and therefore the buffering memory required by the
consumers) but can only limit the maximum duration of the transactions.
This seems challenging at best, because the size of the transaction is a
function of the throughput and the offset commit interval.

My second concern is that applications consuming the topics to which a
source connector writes will perceive those writes as having a potentially
very high lag w/r/t the connector first seeing in the source system the
information for those records. The best case is that the source connector
discovers some information, generates a source record and hands it to
Connect, and Connect writes that record to the topic just before the offset
commit window closes and commits the transaction. A worse case is that the
source connector gives the record to Connect just after the offset commit
window closes (and the transaction is closed), and the transaction with
that record will not be committed for another commit interval. This means
the worst case *perceived lag* could be at least the offset commit interval.

So if we agree that it will be difficult for some Connect users to choose a
one-size-fits-all offset commit interval to work well for source connectors
with a range of throughputs and different consumer application
requirements, we may need to consider the ability for each connector to
control the transaction boundaries, albeit still somewhat indirectly. The
question then becomes how to specify the boundaries. Record counts alone
are insufficient, since a transaction could be 1 record short for some
time, resulting in the entire transaction timing out. Time/duration is also
not sufficient, since for low perceived lag this should be set as small as
is feasible (and thus poor for high throughput).

Using a separate transaction for each batch seems like a very worthwhile
compromise. First, it reuses something that connector implementations
already employ, and which many connectors have configs to tune the behavior
to maximize throughput. Second, it requires no additional configuration
properties or connector-specific behavior. Third, high-throughput
connectors are likely returning large batches, so write amplification for
the extra offset record per source partition is likely to have a much
smaller impact (assuming that most high throughput connectors have high
records-per-source-partition ratios). Likewise, low-throughput connectors
will either have very small or infrequent batches that will easily handle
the higher write amplification (up to 2x: one offset record for every
record). Regardless of the throughput, infrequent batches would have higher
lag anyway even without EOS, and EOS transactions coupled to those batches
would add minimum overhead.


>
> > It's true we already have
> that constraint now, but transactions and more importantly transaction size
> and latency add a completely different aspect to the calculation of how to
> set the offset commit interval for a Connect cluster.

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-14 Thread Chris Egerton
Hi Tom,

Really interesting turn this has taken! Responses inline.

> I'm not quite sure I follow what you mean here. Can you explain? AFAICT it
doesn't apply in a cluster where all tasks are using fencible producers,
but maybe I'm missing something.

Imagine this (unlikely but possible) scenario:

1. A connector is created with N tasks and uses the
fencible-but-not-transactional producer
2. Task N (as in, the task with the highest ID) is allocated to a worker
that then becomes a zombie
3. The connector is reconfigured to use N-1 tasks, and this takes effect on
all non-zombie workers in the cluster
4. The connector is reconfigured to enable full-on exactly-once support
(i.e., use of the transactional producer)

At this point, we would need to know to fence out task N that's running on
the zombie worker. This is what is accomplished in the current design with
the task count records in the config topic; even if the number of tasks in
a connector is decreased, the leader would be aware of the old, higher task
count for that connector, and know to fence out that many tasks.

I was only noting this for completeness' sake; there's nothing about this
requirement that renders your proposal impossible or even significantly
more difficult. We'd just have to make sure to do the task count record
bookkeeping for connectors regardless of whether they're exactly-once or
not, so that if a connector has exactly-once switched on without a cluster
roll in the middle, we'd know exactly how many tasks to fence out before
bringing up that first round of transactional producers.

> That will be the case for the new transactional cluster anyway.

That's true, but we do go from three static ACLs (write/describe on a fixed
transactional ID, and idempotent write on a fixed cluster) to a dynamic
collection of ACLs. In especially large organizations where the people that
administrate Connect clusters aren't necessarily the same as the people
that create and manage connectors this might cause some friction. Still,
since there are benefits to all users (regardless of requirements for
exactly-once delivery guarantees) in the form of fencible producers that
would, in many if not all circumstances, reduce duplicate writes, it's not
out of the question to argue for this change.

I also toyed with the question of "If we're going to require these new ACLs
unconditionally, what's stopping us from just enabling fully-fledged
exactly-once source support by default?". It'd be pretty straightforward to
include zombie fencing for free with this change, for example. The only
remaining blocker seems to be that the connector needs direct write and
read access to the offsets topic that it uses.

Ultimately I think it may behoove us to err on the side of reducing the
breaking changes here for now and saving them for 4.0 (or some later major
release), but would be interested in thoughts from you and others.

> Gouzhang also has a (possible) use case for a fencing-only producer (
https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
there, you should be able to get these semantics today by calling
initTransactions() and then just using the producer as normal (no
beginTransaction()/abortTransaction()/endTransaction()).

I tested this locally and was not met with success; transactional producers
do a check right now to ensure that any calls to "KafkaProducer::send"
occur within a transaction (see
https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
and
https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451).
Not a blocker, just noting that we'd have to do some legwork to make this
workable with the producer API.

> In light of that (and assuming you buy these arguments), I wonder how much
extra effort it would be to do for EOS-enabled clusters as part of this
KIP?

The extra effort wouldn't be negligible (expansion of the producer API,
more complexity in task count record logic, more thorough upgrade notes),
but ultimately I wouldn't object to the proposal because of the extra work
involved. What it really comes down to IMO is how aggressive we're willing
to be with the breaking changes we make for users. If a good argument can
be made for introducing new ACL requirements for every single connector
running on 3.0 and beyond, then I'd be happy to fold this into the KIP in
exchange for the ability to configure exactly-once support on per-connector
basis.

Really enjoying the fresh perspective you're bringing here, especially with
regards to the transactional producer internals and Kafka Streams use cases!

Cheers,

Chris

On Fri, May 14, 2021 at 10:07 AM Tom Bentley  wrote:

> Hi Chris,
>
> Thanks for the reply.
>
> "required"/"requested" sounds good to me. Likewise the pre-flight check and
> "PUT /{connectorTy

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-14 Thread Tom Bentley
Hi Chris,

Thanks for the reply.

"required"/"requested" sounds good to me. Likewise the pre-flight check and
"PUT /{connectorType}/config/validate".

The other half is we'd still need to
> track the number of tasks for that connector that would need to be fenced
> out if/when exactly-once for it were switched on.
>

I'm not quite sure I follow what you mean here. Can you explain? AFAICT it
doesn't apply in a cluster where all tasks are using fencible producers,
but maybe I'm missing something.

If we had the
> intermediate producer you describe at our disposal, and it were in use by
> every running source task for a given connector, we could probably enable
> users to toggle exactly-once on a per-connector basis, but it would also
> require new ACLs for all connectors.
>

That will be the case for the new transactional cluster anyway.

I think there is value to supporting connectors that don't use full-blown
transactions in an exactly-once cluster, because the overhead in a fencing
producer should be similar to an idempotent producer (which IIRC is about
3% above a non-idempotent producer). That's because we only need to make a
single InitProducerIdRequest, and thereafter the epoch check is tiny.

If that's right then many people would then be able to use a single cluster
for both exactly once and non-exactly once connectors (i.e. it would get
rid of the performance cost of running a non-EOS connector in an
exactly-once cluster). Only people who cared about the ~3% would need to
run "old-style" clusters using unfenced producers.

Gouzhang also has a (possible) use case for a fencing-only producer (
https://issues.apache.org/jira/browse/KAFKA-12693), and as he points out
there, you should be able to get these semantics today by calling
initTransactions() and then just using the producer as normal (no
beginTransaction()/abortTransaction()/endTransaction()).

In light of that (and assuming you buy these arguments), I wonder how much
extra effort it would be to do for EOS-enabled clusters as part of this
KIP?

Thanks again,

Tom

On Fri, May 14, 2021 at 2:14 AM Chris Egerton 
wrote:

> Hi Tom,
>
> I'm fine with an implicit mapping of connector-provided null to
> user-exposed UNKNOWN, if the design continues down that overall path.
>
> Allowing users to assert that a connector should support exactly-once
> sounds reasonable; it's similar to the pre-flight checks we already do for
> connector configurations such as invoking "Connector::validate" and
> ensuring that all of the referenced SMTs, Predicates, and Converter classes
> are present on the worker. In fact, I wonder if that's how we could
> implement it--as a preflight check. That way, Connector and Task instances
> won't even have the chance to fail; if the user states a requirement for
> exactly-once support but their connector configuration doesn't meet that
> requirement, we can fail the connector creation/reconfiguration request
> before even writing the new config to the config topic. We could also add
> this support to the "PUT /{connectorType}/config/validate" endpoint so that
> users could test exactly-once support for various configurations without
> having to actually create or reconfigure a connector. We could still fail
> tasks on startup if something slipped by (possibly due to connector
> upgrade) but it'd make the UX a bit smoother in most cases to fail faster.
>
> Since a possible use of the property is to allow future users to control
> exactly-once support on a per-connector basis, I wonder whether a binary
> property is sufficient here. Even if a connector doesn't support
> exactly-once, there could still be benefits to using a transactional
> producer with rounds of zombie fencing; for example, preventing duplicate
> task instances from producing data, which could be leveraged to provide
> at-most-once delivery guarantees. In that case, we'd want a way to signal
> to Connect that the framework should do everything it does to provide
> exactly-once source support, but not make the assertion on the connector
> config, and we'd end up providing three possibilities to users: required,
> best-effort, and disabled. It sounds like right now what we're proposing is
> that we expose only the first two and don't allow users to actually disable
> exactly-once support on a per-connector basis, but want to leave room for
> the third option in the future. With that in mind, "required/not_required"
> might not be the best fit. Perhaps "required"/"requested" for now, with
> "disabled" as the value that could be implemented later?
>
> RE: "Is the problem here simply that the zombie fencing provided by the
> producer is only available when using transactions, and therefore having a
> non-transactional producer in the cluster poses a risk of a zombie not
> being fenced?"--that's half of it. The other half is we'd still need to
> track the number of tasks for that connector that would need to be fenced
> out if/when exactly-once for it were switched on. 

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-13 Thread Chris Egerton
Hi Tom,

I'm fine with an implicit mapping of connector-provided null to
user-exposed UNKNOWN, if the design continues down that overall path.

Allowing users to assert that a connector should support exactly-once
sounds reasonable; it's similar to the pre-flight checks we already do for
connector configurations such as invoking "Connector::validate" and
ensuring that all of the referenced SMTs, Predicates, and Converter classes
are present on the worker. In fact, I wonder if that's how we could
implement it--as a preflight check. That way, Connector and Task instances
won't even have the chance to fail; if the user states a requirement for
exactly-once support but their connector configuration doesn't meet that
requirement, we can fail the connector creation/reconfiguration request
before even writing the new config to the config topic. We could also add
this support to the "PUT /{connectorType}/config/validate" endpoint so that
users could test exactly-once support for various configurations without
having to actually create or reconfigure a connector. We could still fail
tasks on startup if something slipped by (possibly due to connector
upgrade) but it'd make the UX a bit smoother in most cases to fail faster.

Since a possible use of the property is to allow future users to control
exactly-once support on a per-connector basis, I wonder whether a binary
property is sufficient here. Even if a connector doesn't support
exactly-once, there could still be benefits to using a transactional
producer with rounds of zombie fencing; for example, preventing duplicate
task instances from producing data, which could be leveraged to provide
at-most-once delivery guarantees. In that case, we'd want a way to signal
to Connect that the framework should do everything it does to provide
exactly-once source support, but not make the assertion on the connector
config, and we'd end up providing three possibilities to users: required,
best-effort, and disabled. It sounds like right now what we're proposing is
that we expose only the first two and don't allow users to actually disable
exactly-once support on a per-connector basis, but want to leave room for
the third option in the future. With that in mind, "required/not_required"
might not be the best fit. Perhaps "required"/"requested" for now, with
"disabled" as the value that could be implemented later?

RE: "Is the problem here simply that the zombie fencing provided by the
producer is only available when using transactions, and therefore having a
non-transactional producer in the cluster poses a risk of a zombie not
being fenced?"--that's half of it. The other half is we'd still need to
track the number of tasks for that connector that would need to be fenced
out if/when exactly-once for it were switched on. If we had the
intermediate producer you describe at our disposal, and it were in use by
every running source task for a given connector, we could probably enable
users to toggle exactly-once on a per-connector basis, but it would also
require new ACLs for all connectors. Even though we're allowed to make
breaking changes with the upcoming 3.0 release, I'm not sure the tradeoff
is worth it. I suppose we could break down exactly-once support into two
separate config properties--a worker-level property, that causes all source
tasks on the worker to use producers that can be fenced (either full-on
transactional producers or "intermediate" producers), and a per-connector
property, that toggles whether the connector itself uses a full-on
transactional producer or just an intermediate producer (and whether or not
zombie fencing is performed for new task configs). This seems like it might
be overkill for now, though.

As far as the zombie fencing endpoint goes--the behavior will be the same
either way w/r/t the exactly.once.source.enabled property. The property
will dictate whether the endpoint is used by tasks, but it'll be available
for use no matter what. This is how a rolling upgrade becomes possible;
even if the leader hasn't been upgraded yet (to set
exactly.once.source.enabled to true), it will still be capable of handling
fencing requests from workers that have already been upgraded.

Cheers,

Chris

On Wed, May 12, 2021 at 5:33 AM Tom Bentley  wrote:

> Hi Chris and Randall,
>
> I can see that for connectors where exactly once is configuration-dependent
> it makes sense to use a default method. The problem with having an explicit
> UNKNOWN case is we really want connector developers to _not_ use it. That
> could mean it's deprecated from the start. Alternatively we could omit it
> from the enum and use null to mean unknown (we'd have to check for a null
> result anyway), with the contract for the method being that it should
> return non-null. Of course, this doesn't remove the ambiguous case, but
> avoids the need to eventually remove UNKNOWN in the future.
>
> I think there's another way for a worker to use the value too: Imagine
> you're deploying a connector t

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-12 Thread Tom Bentley
Hi Chris and Randall,

I can see that for connectors where exactly once is configuration-dependent
it makes sense to use a default method. The problem with having an explicit
UNKNOWN case is we really want connector developers to _not_ use it. That
could mean it's deprecated from the start. Alternatively we could omit it
from the enum and use null to mean unknown (we'd have to check for a null
result anyway), with the contract for the method being that it should
return non-null. Of course, this doesn't remove the ambiguous case, but
avoids the need to eventually remove UNKNOWN in the future.

I think there's another way for a worker to use the value too: Imagine
you're deploying a connector that you need to be exactly once. It's awkward
to have to query the REST API to determine that exactly once was working,
especially if you need to do this after config changes too. What you
actually want is to make an EOS assertion, via a connector config (e.g.
require.exactly.once=true, or perhaps exactly.once=required/not_required),
which would fail the connector/task if exactly once could not be provided.

The not_required case wouldn't disable the transactional runtime
environment, simply not guarantee that it was providing EOS. Although it
would leave the door open to supporting mixed EOS/non-transactional
deployments in the cluster in the future, if that became possible (i.e. we
could retrospectively make not_required mean no transactions).

On the subject of why it's not possible to enabled exactly once on a
per-connector basis: Is the problem here simply that the zombie fencing
provided by the producer is only available when using transactions, and
therefore having a non-transactional producer in the cluster poses a risk
of a zombie not being fenced? This makes me wonder whether there's a case
for a producer with zombie fencing that is not transactional (intermediate
between idempotent and transactional producer). IIUC this would need to
make a InitProducerId request and use the PID in produce requests, but
could dispense with the other transactional RPCs. If such a thing existed
would the zombie fencing it provided be sufficient to provide safe
semantics for running a non-EOS connector in an EOS-capable cluster?

The endpoint for zombie fencing: It's not described how this works when
exactly.once.source.enabled=false

Cheers,

Tom


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-10 Thread Chris Egerton
Thanks for your thoughts, Randall. Responses below:


RE exactly-once support in the REST API:

> The worker has to do (or plan to do) something with the information about
a
connector's support for EOS, whether that's via an annotation or a method.
Otherwise, what's the point of requiring the connector to expose this
information. But the problem I see with this whole concept is that there
will still be ambiguity. For example, if the method returns `UNKNWON` by
default, the connector could still be written in a way where EOS does work
with the connector. Yet what are users supposed to do when they see
"UNKNOWN"?

I think the idea here is that it'd be nice to have a simple, common
interface for connector developers to expose this information to users. The
natural alternative is to just add a note to the documentation for that
specific connector, but then we've lost the "common" part of that, and
possibly the "simple" (depending on how searchable the docs for the
connector are and how complex the logic of exactly-once support is). For
some sink connectors out there exactly-once support is complex enough to
warrant things like entire pages of documentation and even flowchart
diagrams, and as that complexity rises, user experience gets degraded.

As far as how to handle "UNKNOWN" goes--users can go the usual routes that
they already can for sink connectors: search documentation and reach out to
connector developers. However, once the question of exactly-once support
comes to these connector developers, they will also likely become aware of
the new API for surfacing this information to users, and can take advantage
of it to save themselves the trouble of having to personally answer these
questions in the future. Any connector developers who are already aware of
exactly-once support for source connectors coming to Connect can leverage
the new API either instead of or in addition to expanding docs for their
connectors.

This can be thought of almost like a non-expiring caching layer for
documentation; yes, there are going to be some cache misses (i.e.,
"UNKNOWN"), but whenever one of those misses occurs, there's now an option
to cache the requested information in a more convenient, standard location
for future users.


RE transaction boundaries:

> First, the offset commit interval is currently 60 seconds by default, and
source connectors can produce a lot of records in that time. Until someone
operating a Connect cluster changed that to a much lower value, it's
possible that source connector performance could be significantly impacted.
...
> At first blush,
controlling transaction boundaries based upon batches seems like it would
work better for high throughput connectors.

Sorry, what's the concern about performance here? Write throughput for a
transactional producer doesn't suffer as the number of records per
transaction increases. I'm also worried about source connectors that return
small batches with high frequency; those would not fare well with a lower
offset commit interval.

> It's true we already have
that constraint now, but transactions and more importantly transaction size
and latency add a completely different aspect to the calculation of how to
set the offset commit interval for a Connect cluster.
...
> If we're committing transactions every 60 seconds (or even as frequently
as
every 5 seconds), then the _perceived lag_ will be significantly higher
with transactions than without.

That's a fair point. High-throughput connectors are likely to benefit from
higher commit intervals so that they have to pause to commit offsets, and
produce offset records, less frequently. Low-latency connectors are likely
to benefit from lower commit intervals in order to shorten the time between
source records being produced to Kafka and their transactions being
committed. There's no reason to assume that there's a one-size-fits-all
offset commit interval for an entire cluster once offset commit becomes a
requirement for records to be visible to downstream consumers.

> Have you considered having the worker create a separate transaction for
each batch of records returned by the connector?

I had not considered this, but for reasons outlined above regarding
performance with high-throughput connectors, I'm hesitant to put it into
play by default.

Given the conflicting requirements of high-throughput and low-latency
connectors with regards to offset commit, I do agree that for some
connectors, the most useful default behavior would be to commit offsets as
soon as possible, and committing once for every batch returned from
"SourceTask::poll" is a great way to allow that.

The most straightforward way to accommodate these conflicting cases seems
to be to just go ahead and allow per-connector offset commit intervals.
That way, the user gets to define an approximate upper bound on the time
between a record being produced and its transaction being committed--and if
they want the upper bound to be zero, they can just configure 

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-08 Thread Randall Hauch
Thanks once again for the KIP and the updates, Chris. As mentioned in my
prior email, here are my more general comments/questions (other than my
earlier question about the transaction boundaries). Hopefully these are
readable:

The worker-level offset.flush.timeout.ms property will be ignored for
> exactly-once source tasks. They will be allowed to take as long as
> necessary to complete an offset commit, since the cost of failure at that
> point is to fail the source task. Currently, all source task offset commits
> take place on a single shared worker-global thread. In order to support
> source task commits without a timeout, but also prevent  laggy tasks from
> disrupting the availability of other tasks on the cluster, the worker will
> be modified to permit simultaneous source task offset commits.


This seems to contradict the earlier quote. Specifically, the first quote
above states that transaction boundaries are dictated by offset flushes,
which is controlled by the `offset.flush.interval.ms` property. However,
the second quote says the `offset.flush.timeout.ms` property will be
ignored for EOS source tasks. I must be missing something.

It may take longer than the transaction timeout for a task to flush all of
> its records to Kafka. In this case, there are two actions that users can
> take to nurse their connector back to health: reconfigure it to produce
> records with a lower throughput, or increase the transaction timeout for
> the producers used by the connector.


Does it seem realistic or practical to suggest to reduce the source
connector's throughput?

Finally, it allows users to limit the effect that hanging transactions on
> an offsets topic will have. If tasks A and B use the same offsets topic,
> and task A initiates a transaction on that offsets topic right before task
> B starts up, then task A dies suddenly without committing its transaction,
> task B will have to wait for that transaction to time out before it can
> read to the end of the offsets topic. If the transaction timeout is set
> very high for task A (to accommodate bursts of high throughput, for
> example), this will block task B from processing any data for a long time.
> Although this scenario may be unavoidable in some cases, using a dedicated
> offsets topic for each connector should allow cluster administrators to
> isolate the blast radius of a hanging transaction on an offsets topic. This
> way, although tasks of the same connector may still interfere with each
> other, they will at least not interfere with tasks of other connectors.
> This should be sufficient for most multitenant environments.


Do you mean to use "task A" and "task B" here? Do they imply tasks from the
same connector? If so, then won't the offsets from both of those tasks
still be written to the same connector-specific offset topic, and suffer
from the potential blocking issue mentioned above? While it's useful to
call this out, I'm not sure how that example helps support the motivation
for a separate per-connector offset topic. OTOH, if these were tasks from
_different_ connectors, then it becomes clear that the offsets from one
source connector using a connector-specific offsets topic will never block
the offsets from another connector using a different connector-specific
offsets topic. Thus, having each connector use separate connector-specific
offset topics at least avoids the problem of one connector's tasks blocking
the tasks of the other connector just because of transaction commit timeout
issues.

Regardless of whether exactly.once.source.enabled  is set to true  for the
> worker, if a connector configuration contains a value for the
> offsets.storage.topic  property, it will use an offsets topic with that
> name on the Kafka cluster that it produces data to (which may be different
> from the one that hosts the worker's global offsets topic).


It's implied but not explicitly stated that this will result in duplicating
the offsets for EOS source connectors: the workers will continue to track
all source offsets in its own offsets topic, and EOS-enabled source
connectors will also track their own offsets in connector-specific offsets
topics. It might be worth making that more obvious, and which will be used
upon connector restarts.

If a connector is explicitly or implicitly configured to use a separate
> offsets topic but that topic does not exist yet, task and connector
> instances will automatically try to create the topic before startup.


Are you suggesting that the source connector and task implementations be
responsible for this? Or are you suggesting that the worker will be
responsible for creating the offsets topics on behalf of source connector
and task implementations (using an admin client per the configurations),
just like the worker currently does so for the cluster's offsets topic?
Please clarify this in the KIP.

If a worker is active and does not have support for exactly-once delivery...


Do you think it's worthwhile add

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-08 Thread Randall Hauch
Thanks again for the continued improvements to this KIP, Chris. I have a
number of questions that I'll enumerate in a subsequent email, but first I
wanted to make a higher-level comment.

I don't know what to make about the proposal to rely upon the existing
offset commit interval as the way to control the transaction boundaries.
Specifically, the KIP says:

> The timing for offset commits will remain the same as it is today; they
> will be triggered periodically on a fixed interval that users can adjust
> via the offset.flush.interval.ms property. Transaction boundaries and
> record batches will be defined by these offset commits; expanded support
> for transaction boundary definition can be added later and is noted in the
> "future work" section.
>

It's not clear to me why we're coupling the transaction boundaries to the
existing offset flush behavior. (BTW, I didn't see a rejected alternative
that addresses this. There is one about "Connector-defined transaction
boundaries", but I'm not suggesting that we let connectors explicitly
control the transaction boundaries.)

First, the offset commit interval is currently 60 seconds by default, and
source connectors can produce a lot of records in that time. Until someone
operating a Connect cluster changed that to a much lower value, it's
possible that source connector performance could be significantly impacted.

Second, there is only one worker-level setting for the offset commit
interval, which means that this one setting would have to be used for
source connectors with high and low throughput. It's true we already have
that constraint now, but transactions and more importantly transaction size
and latency add a completely different aspect to the calculation of how to
set the offset commit interval for a Connect cluster.

Third and perhaps most importantly, a record produced by a source connector
and written to the Kafka topic outside of a transaction is visible to a
consumer (worst case) immediately after that record has been replicated to
all of the in-sync replicas. However, if the producer writes that record in
a transaction, then that record will be visible to a consumer (using
`isolation.level=read_committed`) only after that transaction is committed.
If we're committing transactions every 60 seconds (or even as frequently as
every 5 seconds), then the _perceived lag_ will be significantly higher
with transactions than without. And that seems to go against the last two
goals of the KIP: "Minimize gotchas and potential footguns" and "Overall,
make this a feature that gives people joy to use, not pain".

Have you considered having the worker create a separate transaction for
each batch of records returned by the connector? Yes we'd still want to add
the appropriate offset topic record(s) at the end of each transaction, but
this seems to align much more closely with the existing mechanisms
connector developers have and any existing configuration tuning options
connector developers and their users already have. And the existing
worker-level source offsets could still be flushed with the current
behavior, since the worker-level (non-transactional) offsets don't need to
be flushed at the same time as the transaction boundaries. At first blush,
controlling transaction boundaries based upon batches seems like it would
work better for high throughput connectors. It is true that low-throughput
connectors would result in higher write amplification (worst case being 2x
when each batch returns a single record), but for low-throughput connectors
this seems like this might be an acceptable tradeoff if EOS is really
needed, or if not then EOS can be disabled for this connector. This would
add an implicit mapping between the batches and transactions, but I still
think we'd want to eventually allow in the future a source connector to
_explicitly_ control the transaction boundaries within a single batch.

Thanks, and best regards.

Randall

On Tue, May 4, 2021 at 4:11 PM Chris Egerton 
wrote:

> Hi all,
>
> Good news everyone! I've reworked the design one more time and hopefully
> some of the improvements here should make the proposal more palatable.
> TL;DR:
>
> - Rolling upgrades are now possible, in at most two phases; workers will
> first have to be given a binary upgrade to 3.0 (the targeted version for
> this feature) which can be a rolling upgrade, and then a rolling upgrade to
> enable exactly-once source support in a cluster should be possible with no
> anticipated downtime for source connectors or their tasks
> - Offset topic migration is completely removed in favor of fallback to the
> global offsets topic (much simpler!)
> - One backwards-incompatible change is introduced: the leader will be
> required to use a transactional producer for writes to the config topic
> regardless of whether exactly-once support is enabled on the cluster.
> Technically we could gate this behind a config property but since the
> benefits of a transactional producer actually exte

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-08 Thread Randall Hauch
Thanks for continuing to work on this KIP, Chris.

On Fri, May 7, 2021 at 12:58 PM Chris Egerton 
wrote:

> b) An annotation is a really cool way to allow connector developers to
> signal eligibility for exactly-once to Connect (and possibly, through
> Connect, to users). Like you mention, connectors both with and without the
> annotation could still run on both pre- and post-upgrade workers with no
> worries about missing class errors. And it only requires a single-line
> change to each connector. My only concern is that with some connectors,
> exactly-once support might not be all-or-nothing and might be dependent on
> how the connector is configured. For a practical example, Confluent's JDBC
> source connector would likely be eligible for exactly-once when run in
> incrementing mode (where it tracks offsets based on the value of a
> monotonically-increasing table column), but not in bulk mode (where it
> doesn't provide offsets for its records at all). With that in mind, what do
> you think about a new "exactlyOnce()" method to the SourceConnector class
> that can return a new ExactlyOnce enum with options of "SUPPORTED",
> "UNSUPPORTED", and "UNKNOWN", with a default implementation that returns
> "UNKNOWN"? This can be invoked by Connect after start() has been called to
> give the connector a chance to choose its response based on its
> configuration.
> As far as what to do with this information goes--I think it'd go pretty
> nicely in the response from the GET /connectors/{connector} endpoint, which
> currently includes information about the connector's name, configuration,
> and task IDs. We could store the info in the config topic in the same
> record that contains the connector's configuration whenever a connector is
> (re)configured, which would guarantee that the information provided about
> eligibility for exactly-once matches the configuration it was derived from,
> and would present no compatibility issues (older workers would be able to
> read records written by new workers and vice-versa). Thoughts?
>

The worker has to do (or plan to do) something with the information about a
connector's support for EOS, whether that's via an annotation or a method.
Otherwise, what's the point of requiring the connector to expose this
information. But the problem I see with this whole concept is that there
will still be ambiguity. For example, if the method returns `UNKNWON` by
default, the connector could still be written in a way where EOS does work
with the connector. Yet what are users supposed to do when they see
"UNKNOWN"?


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-07 Thread Chris Egerton
Hi Tom,

Thanks for taking a look! Really appreciate it. Answers below:

1.
a) This is possible, but it would likely require reworking the rebalance
protocol, and would not provide comprehensive guarantees about exactly-once
delivery. If the group coordination protocol used for rebalancing is also
leveraged to determine whether every worker has exactly-once source support
enabled, it will by definition fail to provide information about zombie
workers that are up and actively processing data, but have become
disconnected from the group coordinator. Given the additional effort
required and the potential for false positives (which could lead to a
pretty bad user experience and undermine trust in the feature) I'm not sure
it'd be worth it to try to add that kind of automatic detection, but I'd be
interested in your thoughts, especially if there's an easier way to get
what we want here.
b) An annotation is a really cool way to allow connector developers to
signal eligibility for exactly-once to Connect (and possibly, through
Connect, to users). Like you mention, connectors both with and without the
annotation could still run on both pre- and post-upgrade workers with no
worries about missing class errors. And it only requires a single-line
change to each connector. My only concern is that with some connectors,
exactly-once support might not be all-or-nothing and might be dependent on
how the connector is configured. For a practical example, Confluent's JDBC
source connector would likely be eligible for exactly-once when run in
incrementing mode (where it tracks offsets based on the value of a
monotonically-increasing table column), but not in bulk mode (where it
doesn't provide offsets for its records at all). With that in mind, what do
you think about a new "exactlyOnce()" method to the SourceConnector class
that can return a new ExactlyOnce enum with options of "SUPPORTED",
"UNSUPPORTED", and "UNKNOWN", with a default implementation that returns
"UNKNOWN"? This can be invoked by Connect after start() has been called to
give the connector a chance to choose its response based on its
configuration.
As far as what to do with this information goes--I think it'd go pretty
nicely in the response from the GET /connectors/{connector} endpoint, which
currently includes information about the connector's name, configuration,
and task IDs. We could store the info in the config topic in the same
record that contains the connector's configuration whenever a connector is
(re)configured, which would guarantee that the information provided about
eligibility for exactly-once matches the configuration it was derived from,
and would present no compatibility issues (older workers would be able to
read records written by new workers and vice-versa). Thoughts?

2. You're correct; because there is no blocking, we have no guarantees that
the global offsets topic is ever actually populated, and if there are
issues with populating it and then a hard downgrade is necessary, a flood
of duplicates could result. My thinking here was that we'd already be
taking a hit in performance by introducing transactions and switching from
an asynchronous offset commit model to a synchronous one; I didn't want to
slow things down any further by having to write to not one, but two
different offsets topics.

3. Added to the KIP but reiterating here: the authorizations required for
`Admin::fenceProducers` will be the same as the ones required to use a
transactional producer; specifically, grants for the Write and Describe
operations on all of the TransactionalId resources by the user, and a grant
for the IdempotentWrite operation on the Cluster resource.

4. Added to the KIP but reiterating here: the name of the topic will be the
value of the "offsets.storage.topic" property in the worker config.

Cheers, and thanks again for the review!

Chris

On Thu, May 6, 2021 at 12:14 PM Tom Bentley  wrote:

> Hi Chris,
>
> Thanks for this KIP. I've taken an initial look and have a few questions.
>
> 1. The doc for exactly.once.source.enabled says "Note that this must be
> enabled on every worker in a cluster in order for exactly-once delivery to
> be guaranteed, and that some source connectors may still not be able to
> provide exactly-once delivery guarantees even with this support enabled."
> a) Could we detect when only some workers in the cluster had support
> enabled, and make it apparent that exactly-once wasn't guaranteed?
> b) I'm wondering how users will be able to reason about when a connector is
> really giving them exactly-once. I think this is defined in the limitations
> section much later on, right? It seems to require somewhat detailed
> knowledge of how the connector is implemented. And right now there's no
> standard way for a connector author to advertise their connector as being
> compatible. I wonder if we could tackle this in a compatible way using an
> annotation on the Connector class. That shouldn't cause problems for older
> versions of Co

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-06 Thread Tom Bentley
Hi Chris,

Thanks for this KIP. I've taken an initial look and have a few questions.

1. The doc for exactly.once.source.enabled says "Note that this must be
enabled on every worker in a cluster in order for exactly-once delivery to
be guaranteed, and that some source connectors may still not be able to
provide exactly-once delivery guarantees even with this support enabled."
a) Could we detect when only some workers in the cluster had support
enabled, and make it apparent that exactly-once wasn't guaranteed?
b) I'm wondering how users will be able to reason about when a connector is
really giving them exactly-once. I think this is defined in the limitations
section much later on, right? It seems to require somewhat detailed
knowledge of how the connector is implemented. And right now there's no
standard way for a connector author to advertise their connector as being
compatible. I wonder if we could tackle this in a compatible way using an
annotation on the Connector class. That shouldn't cause problems for older
versions of Connect from running connectors with the annotation (it's not
an error for an annotation type to not be present at runtime), but would
allow the support for exactly-once to be apparent and perhaps even exposed
through the connector status REST endpoint. It completely relies on the
connector author honouring the contact, of course, but it doesn't have the
compatibility problems of using a marker interface, for example.

2. About the post-transaction offset commit: " This will be handled on a
separate thread from the task’s work and offset commit threads, and should
not block or interfere with the task at all." If there's no blocking then
how can we be sure that the write to the global offsets topic ever actually
happens? If it never happens then presumably in case of a hard downgrade we
could see arbitrarily many duplicates? I don't necessarily see this as a
show-stopper, more I'm trying to understand what's possible with this
design.

3. What authorization is needed for Admin.fenceProducers()?

4. Maybe I missed it, but when a per-connector offsets storage topic is
created implicitly what will it be called?

Cheers,

Tom


On Tue, May 4, 2021 at 10:26 PM Chris Egerton 
wrote:

> Hi all,
>
> Good news everyone! I've reworked the design one more time and hopefully
> some of the improvements here should make the proposal more palatable.
> TL;DR:
>
> - Rolling upgrades are now possible, in at most two phases; workers will
> first have to be given a binary upgrade to 3.0 (the targeted version for
> this feature) which can be a rolling upgrade, and then a rolling upgrade to
> enable exactly-once source support in a cluster should be possible with no
> anticipated downtime for source connectors or their tasks
> - Offset topic migration is completely removed in favor of fallback to the
> global offsets topic (much simpler!)
> - One backwards-incompatible change is introduced: the leader will be
> required to use a transactional producer for writes to the config topic
> regardless of whether exactly-once support is enabled on the cluster.
> Technically we could gate this behind a config property but since the
> benefits of a transactional producer actually extend beyond exactly-once
> source support (we can now ensure that there's only one writer to the
> config topic at any given time, which isn't guaranteed with the current
> model) and the cost to accommodate it is fairly low (a handful of
> well-defined and limited-scope ACLs), I erred on the side of keeping things
> simple
>
> Looking forward to the next round of review and really hoping we can get
> the ball rolling in time for this to land with 3.0!
>
> Cheers,
>
> Chris
>
> On Mon, Apr 12, 2021 at 7:51 AM Chris Egerton  wrote:
>
> > Hi Randall,
> >
> > After thinking things over carefully, I've done some reworking of the
> > design. Instead of performing zombie fencing during rebalance, the leader
> > will expose an internal REST endpoint that will allow workers to request
> a
> > round of zombie fencing on demand, at any time. Workers will then hit
> this
> > endpoint after starting connectors and after task config updates for
> > connectors are detected; the precise details of this are outlined in the
> > KIP. If a round of fencing should fail for any reason, the worker will be
> > able to mark its Connector failed and, if the user wants to retry, they
> can
> > simply restart the Connector via the REST API (specifically, the POST
> > /connectors/{connector}/restart endpoint).
> >
> > The idea I'd been playing with to allow workers to directly write to the
> > config topic seemed promising at first, but it allowed things to get
> pretty
> > hairy for users if any kind of rebalancing bug took place and two workers
> > believed they owned the same Connector object.
> >
> > I hope this answers any outstanding questions and look forward to your
> > thoughts.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Mar 22, 2021 at 4:38 PM Chris Eger

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-05-04 Thread Chris Egerton
Hi all,

Good news everyone! I've reworked the design one more time and hopefully
some of the improvements here should make the proposal more palatable.
TL;DR:

- Rolling upgrades are now possible, in at most two phases; workers will
first have to be given a binary upgrade to 3.0 (the targeted version for
this feature) which can be a rolling upgrade, and then a rolling upgrade to
enable exactly-once source support in a cluster should be possible with no
anticipated downtime for source connectors or their tasks
- Offset topic migration is completely removed in favor of fallback to the
global offsets topic (much simpler!)
- One backwards-incompatible change is introduced: the leader will be
required to use a transactional producer for writes to the config topic
regardless of whether exactly-once support is enabled on the cluster.
Technically we could gate this behind a config property but since the
benefits of a transactional producer actually extend beyond exactly-once
source support (we can now ensure that there's only one writer to the
config topic at any given time, which isn't guaranteed with the current
model) and the cost to accommodate it is fairly low (a handful of
well-defined and limited-scope ACLs), I erred on the side of keeping things
simple

Looking forward to the next round of review and really hoping we can get
the ball rolling in time for this to land with 3.0!

Cheers,

Chris

On Mon, Apr 12, 2021 at 7:51 AM Chris Egerton  wrote:

> Hi Randall,
>
> After thinking things over carefully, I've done some reworking of the
> design. Instead of performing zombie fencing during rebalance, the leader
> will expose an internal REST endpoint that will allow workers to request a
> round of zombie fencing on demand, at any time. Workers will then hit this
> endpoint after starting connectors and after task config updates for
> connectors are detected; the precise details of this are outlined in the
> KIP. If a round of fencing should fail for any reason, the worker will be
> able to mark its Connector failed and, if the user wants to retry, they can
> simply restart the Connector via the REST API (specifically, the POST
> /connectors/{connector}/restart endpoint).
>
> The idea I'd been playing with to allow workers to directly write to the
> config topic seemed promising at first, but it allowed things to get pretty
> hairy for users if any kind of rebalancing bug took place and two workers
> believed they owned the same Connector object.
>
> I hope this answers any outstanding questions and look forward to your
> thoughts.
>
> Cheers,
>
> Chris
>
> On Mon, Mar 22, 2021 at 4:38 PM Chris Egerton  wrote:
>
>> Hi Randall,
>>
>> No complaints about email size from me. Let's dive in!
>>
>> 1. Sure! Especially important in my mind is that this is already possible
>> with Connect as it is today, and users can benefit from this with or
>> without the expanded exactly-once souce support we're trying to add with
>> this KIP. I've added that info to the "Motivation" section and included a
>> brief overview of the idempotent producer in the "Background and
>> References" section.
>>
>> 2. I actually hadn't considered enabling exactly-once source support by
>> default. Thinking over it now, I'm a little hesitant to do so just because,
>> even with the best testing out there, it's a pretty large change and it
>> seems safest to try to make it opt-in in case there's unanticipated
>> fallout. Then again, when incremental cooperative rebalancing was
>> introduced, it was made opt-out instead of opt-in. However, ICR came with
>> lower known risk of breaking existing users' setups; we know for a fact
>> that, if you haven't granted your worker or connector principals some ACLs
>> on Kafka, your connectors will fail. In an ideal world people would
>> carefully read upgrade notes and either grant those permissions or disable
>> the feature before upgrading their Connect cluster to 3.0, but if they
>> don't, they'll be in for a world of hurt. Overall I'd be more comfortable
>> letting this feature incubate for a little bit to let everyone get familiar
>> with it before possibly enabling it in 4.0 by default; what do you think?
>>
>> 3. I didn't think too long about the name for the offsets topic property;
>> it seemed pretty unlikely that it'd conflict with existing connector
>> property names. One alternative could be to follow the idiom established by
>> KIP-458 and include the word "override" in there somewhere, but none of the
>> resulting names seem very intuitive ("offsets.storage.override.topic" seems
>> the best to me but even that isn't super clear). Happy to field suggestions
>> if anyone has any alternatives they'd like to propose.
>>
>> 4. I _really_ wanted to enable per-connector toggling of this feature for
>> the exact reasons you've outlined. There's already a couple cases where
>> some piece of functionality was introduced that users could only control at
>> the worker config level and, later, effort had to

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-04-12 Thread Chris Egerton
Hi Randall,

After thinking things over carefully, I've done some reworking of the
design. Instead of performing zombie fencing during rebalance, the leader
will expose an internal REST endpoint that will allow workers to request a
round of zombie fencing on demand, at any time. Workers will then hit this
endpoint after starting connectors and after task config updates for
connectors are detected; the precise details of this are outlined in the
KIP. If a round of fencing should fail for any reason, the worker will be
able to mark its Connector failed and, if the user wants to retry, they can
simply restart the Connector via the REST API (specifically, the POST
/connectors/{connector}/restart endpoint).

The idea I'd been playing with to allow workers to directly write to the
config topic seemed promising at first, but it allowed things to get pretty
hairy for users if any kind of rebalancing bug took place and two workers
believed they owned the same Connector object.

I hope this answers any outstanding questions and look forward to your
thoughts.

Cheers,

Chris

On Mon, Mar 22, 2021 at 4:38 PM Chris Egerton  wrote:

> Hi Randall,
>
> No complaints about email size from me. Let's dive in!
>
> 1. Sure! Especially important in my mind is that this is already possible
> with Connect as it is today, and users can benefit from this with or
> without the expanded exactly-once souce support we're trying to add with
> this KIP. I've added that info to the "Motivation" section and included a
> brief overview of the idempotent producer in the "Background and
> References" section.
>
> 2. I actually hadn't considered enabling exactly-once source support by
> default. Thinking over it now, I'm a little hesitant to do so just because,
> even with the best testing out there, it's a pretty large change and it
> seems safest to try to make it opt-in in case there's unanticipated
> fallout. Then again, when incremental cooperative rebalancing was
> introduced, it was made opt-out instead of opt-in. However, ICR came with
> lower known risk of breaking existing users' setups; we know for a fact
> that, if you haven't granted your worker or connector principals some ACLs
> on Kafka, your connectors will fail. In an ideal world people would
> carefully read upgrade notes and either grant those permissions or disable
> the feature before upgrading their Connect cluster to 3.0, but if they
> don't, they'll be in for a world of hurt. Overall I'd be more comfortable
> letting this feature incubate for a little bit to let everyone get familiar
> with it before possibly enabling it in 4.0 by default; what do you think?
>
> 3. I didn't think too long about the name for the offsets topic property;
> it seemed pretty unlikely that it'd conflict with existing connector
> property names. One alternative could be to follow the idiom established by
> KIP-458 and include the word "override" in there somewhere, but none of the
> resulting names seem very intuitive ("offsets.storage.override.topic" seems
> the best to me but even that isn't super clear). Happy to field suggestions
> if anyone has any alternatives they'd like to propose.
>
> 4. I _really_ wanted to enable per-connector toggling of this feature for
> the exact reasons you've outlined. There's already a couple cases where
> some piece of functionality was introduced that users could only control at
> the worker config level and, later, effort had to be made in order to add
> per-connector granularity: key/value converters and connector Kafka clients
> are the two at the top of my head, and there may be others. So if at all
> possible, it'd be great if we could support this. The only thing standing
> in the way is that it allows exactly-once delivery to be compromised, which
> in my estimation was unacceptable. I'm hoping we can make this feature
> great enough that it'll work with most if not all source connectors out
> there, and users won't even want to toggle it on a per-connector basis.
> Otherwise, we'll have to decide between forcing users to split their
> connectors across two Connect clusters (one with exactly-once source
> enabled, one with it disabled), which would be awful, or potentially seeing
> duplicate record delivery for exactly-once connectors, which is also awful.
>
> 5. Existing source connectors won't necessarily have to be configured with
> "consumer.override" properties, but there are a couple of cases where that
> will be necessary:
>
> a. Connector is running against against a secured Kafka cluster and the
> principal configured in the worker via the various "consumer." properties
> (if there is one) doesn't have permission to access the offsets topic that
> the connector will use. If no "consumer.override." properties are specified
> in this case, the connector and its tasks will fail.
>
> b. Connector is running against a separate Kafka cluster from the one
> specified in the worker's config with the "bootstrap.servers" property (or
> the "consumer.

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-03-22 Thread Chris Egerton
Hi Randall,

No complaints about email size from me. Let's dive in!

1. Sure! Especially important in my mind is that this is already possible
with Connect as it is today, and users can benefit from this with or
without the expanded exactly-once souce support we're trying to add with
this KIP. I've added that info to the "Motivation" section and included a
brief overview of the idempotent producer in the "Background and
References" section.

2. I actually hadn't considered enabling exactly-once source support by
default. Thinking over it now, I'm a little hesitant to do so just because,
even with the best testing out there, it's a pretty large change and it
seems safest to try to make it opt-in in case there's unanticipated
fallout. Then again, when incremental cooperative rebalancing was
introduced, it was made opt-out instead of opt-in. However, ICR came with
lower known risk of breaking existing users' setups; we know for a fact
that, if you haven't granted your worker or connector principals some ACLs
on Kafka, your connectors will fail. In an ideal world people would
carefully read upgrade notes and either grant those permissions or disable
the feature before upgrading their Connect cluster to 3.0, but if they
don't, they'll be in for a world of hurt. Overall I'd be more comfortable
letting this feature incubate for a little bit to let everyone get familiar
with it before possibly enabling it in 4.0 by default; what do you think?

3. I didn't think too long about the name for the offsets topic property;
it seemed pretty unlikely that it'd conflict with existing connector
property names. One alternative could be to follow the idiom established by
KIP-458 and include the word "override" in there somewhere, but none of the
resulting names seem very intuitive ("offsets.storage.override.topic" seems
the best to me but even that isn't super clear). Happy to field suggestions
if anyone has any alternatives they'd like to propose.

4. I _really_ wanted to enable per-connector toggling of this feature for
the exact reasons you've outlined. There's already a couple cases where
some piece of functionality was introduced that users could only control at
the worker config level and, later, effort had to be made in order to add
per-connector granularity: key/value converters and connector Kafka clients
are the two at the top of my head, and there may be others. So if at all
possible, it'd be great if we could support this. The only thing standing
in the way is that it allows exactly-once delivery to be compromised, which
in my estimation was unacceptable. I'm hoping we can make this feature
great enough that it'll work with most if not all source connectors out
there, and users won't even want to toggle it on a per-connector basis.
Otherwise, we'll have to decide between forcing users to split their
connectors across two Connect clusters (one with exactly-once source
enabled, one with it disabled), which would be awful, or potentially seeing
duplicate record delivery for exactly-once connectors, which is also awful.

5. Existing source connectors won't necessarily have to be configured with
"consumer.override" properties, but there are a couple of cases where that
will be necessary:

a. Connector is running against against a secured Kafka cluster and the
principal configured in the worker via the various "consumer." properties
(if there is one) doesn't have permission to access the offsets topic that
the connector will use. If no "consumer.override." properties are specified
in this case, the connector and its tasks will fail.

b. Connector is running against a separate Kafka cluster from the one
specified in the worker's config with the "bootstrap.servers" property (or
the "consumer.bootstrap.servers" property, if specified). For an example,
the connector is configured with a "producer.override.boostrap.servers"
property. If no corresponding "consumer.override.bootstrap.servers"
property is specified by the connector (or on is provided but doesn't match
the one used for the producer), bad things will happen--best case, the
connector refuses to start up because it's configured with a separate
offsets topic and no offsets migration is able to take place; worst case,
the connector starts reading offsets from a stale topic and every time a
task is restarted, duplicate records ensue. Since this can compromise
delivery guarantees I think action is worth taking here, and I'm glad
you've raised this scenario. Proposed approach: If the user hasn't
specified a "consumer.override.bootstrap.servers" property in their
connector config and the "bootstrap.servers" property that we'd configure
its consumer with (based on either the "consumer.bootstrap.servers" worker
property or, if not provided, the "bootstrap.servers" worker property) is
different from the "bootstrap.servers" that we'd configure its producer
with, we can log a warning but then automatically configure the consumer
with the "bootstrap.servers" that are going to b

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-03-22 Thread Chris Egerton
Hi Randall,

Thanks for taking a look! Responses inline:

> This would seem to make the transactions based upon time rather than
record
count. Is that really the intent? Did you consider using one transaction
per "batch" of records, whatever number of records that means? One issue
with the time-based boundary is that `offset.flush.interval.ms` defaults to
60 seconds, and that could mean many hundreds of thousands of records on a
high throughput task. (BTW, I do agree with the KIP's rejected alternative
to not expose to the task or let the task define the transaction
boundaries.)

Yep, this was the intent. I covered a few alternatives in the "Finer
control over offset commits" section in the rejected alternatives all the
way at the bottom, including allowing source tasks to define their own
transaction boundaries, allowing per-connector source offset intervals, and
allowing offsets to be committed after a certain number of records. While
writing this up I also realized that, besides establishing batch boundaries
based on the number of records, another possibility could also be to count
each collection of records returned from a single call to SourceTask::poll
as its own batch, although this might seriously hamper throughput for tasks
that provide a small number of records at a time, but do so with high
frequency.

I was hoping we could save this for future work since it's unclear that it
would be necessary to operate Connect with exactly-once source support. It
almost seems like a bit of a leaky abstraction for users to have to worry
about the details of offset commit logic, and I'm hoping we can do some
magic behind the curtains to avoid this. Here's a few thoughts on how we
can keep fixed-time-interval offset flushing with exactly-once source
support without making things worse for users:

1. Ignore the offset flush timeout parameter for exactly-once source tasks.
They should be allowed to take as long as they want to complete an offset
commit, since the cost of failure at that point is to fail the source task.
This might require a change to how Connect manages source task offset
commits right now since a single worker-global thread is used to schedule
and perform offset commits for all source tasks, and any blocked task will
also block offset commits for all other tasks, but I'm confident that we
can find a way to solve this problem and would like to leave it as an
implementation detail that doesn't need to be covered extensively in this
KIP.

2. The only other problem I foresee with high-throughput tasks like the
ones you described is that it may take longer than the transaction timeout
for a task to flush all of its records to Kafka. In this case, there are
two actions that users can take to nurse their connector back to health:
reconfigure it to produce records with a lower throughput, or increase the
transaction timeout for the producers used by the connector. We can include
include these steps in the error message for a task that fails due to
producer transaction timeout.

Does this seem reasonable? If so, I'll add this info to the design doc.


> Gwen's #5 question was about rebalances of a source connector. Your answer
made sense, but while the KIP talks about source tasks being stopped before
the connector is rebalanced, I'm wondering about the delay or other impact
this has on the rebalance itself. The "Cannot fence out producers during
rebalance" section talks about fencing out producers, but IIUC this is not
the same as:

> ... all workers will preemptively stop all tasks of all source connectors
for which task configurations.

> If this is already addressed in the KIP, please just point me to that
section.

Not addressed yet :) I haven't called out a potential impact on rebalance
because, although there will probably be one, I don't believe it'll be
substantial. After a connector is reconfigured and its new task configs are
written to the config topic, the sequence of events for a single worker
would be:

1. New task configs are read from the config topic
2. All tasks for that connector are stopped in parallel (which is already
how en-masse task start/stop is performed by a distributed worker)
3. Any tasks that take longer than the graceful task shutdown timeout are
abandoned and allowed to continue running
4. Worker rejoins the group

The only new impact on rebalance might come from steps 2-3, but it's
unlikely to be significant. Distributed workers currently use a pool of
eight threads to handle en-masse start/stop of tasks and connectors, have a
default graceful task shutdown timeout of five seconds, and a default
rebalance timeout of sixty seconds. In order to block the worker for an
extra thirty seconds (only half of the rebalance timeout), there would have
to be (30 / 5) * 8 = 48 tasks that are either completely hung or drag their
feet for five seconds each during shutdown. This would have to either come
from a single connector, or a collection of connectors that were
reconfigured in ex

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-03-18 Thread Randall Hauch
Hey, Chris.

Thanks again for all your hard work figuring out this complicated problem.
While there are still issues to iron out, at the moment it seems pretty
sensible.

Here's my follow up email with additional comments/questions. I've
organized these by section to help provide context, and I've numbered the
questions to make it easier to reply/discuss. I apologize for the length of
this email -- I thought it better to do this all at once rather than string
out in several emails.

Motivation
1. This section does not mention another important source of duplicate
records: producer retries. Using the idempotent producer can prevent
duplicates due to retries, but it can't help with anything else. Perhaps
it's worth mentioning?

Public Interfaces
2. Have we considered `exactly.once.source.enabled=true` by default in the
worker config? We're currently on track for considering this KIP for
inclusion in AK 3.0, so we are allowed backward incompatible changes.
3. It is possible (but maybe unlikely) that existing source connectors
might already have a connector config property that matches the proposed
new connector config `offsets.storage.topic`. It's good that this matches
the worker's config, but were any other property names considered?
4. I see that one of the rejected alternatives was a connector-level option
to disable EOS. For example, what happens if some source connectors don't
work well with EOS (e.g., the transaction blocking behavior of a multi-task
source connector is unacceptable)? What would we say if a user wants to use
EOS for some source connectors but not others? Is it really viable to not
allow that?
5. The worker source task will create a consumer for the offset topic used
by the connector. Currently, Connect does not use or do anything with
`consumer.override.*` properties, but won't they now likely be needed with
a source connector using EOS? What happens with existing source connector
configurations that don't define these?

Proposed Changes
Offset reads
6. The KIP says "a message will be logged notifying them of this fact."
Should it be more specific, namely "a warning message..."?

SourceTask record commit API
7. The second paragraph talks about why we cannot guarantee the methods be
invoked after every successful commit, and I think that makes sense because
there is no other practical way to handle worker failures. It does seem
like it would be more useful to describe the expectation about calling
these methods during normal task shutdown. Specifically, should connector
developers expect that these methods will be called following a successful
commit immediately prior to stopping the task (barring worker failures)?

Per-connector offsets topic
This section says:

> Finally, it allows users to limit the effect that hanging transactions on
> an offsets topic will have. If tasks A and B use the same offsets topic,
> and task A initiates a transaction on that offsets topic right before task
> B starts up, then task A dies suddenly without committing its transaction,
> task B will have to wait for that transaction to time out before it can
> read to the end of the offsets topic. If the transaction timeout is set
> very high for task A (to accommodate bursts of high throughput, for
> example), this will block task B from processing any data for a long time.
> Although this scenario may be unavoidable in some cases, using a dedicated
> offsets topic for each connector should allow cluster administrators to
> isolate the blast radius of a hanging transaction on an offsets topic. This
> way, although tasks of the same connector may still interfere with each
> other, they will at least not interfere with tasks of other connectors.
> This should be sufficient for most multitenant environments.


8. Won't there also be transaction contention just within a single
connector using multiple tasks? Even if that connector has all offsets go
to a dedicated topic (separate from all other connectors), then isn't it
still possible/likely that a transaction from task 1 blocks a transaction
from task 2? How will this affect latency perceived by consumers (e.g., the
records from task 2's tranasction don't become visible until the
transactions from task 1 and task 2 commit)?

Migration
9. This section says:

> the worker will read to the end of the offsets topic before starting the
> task. If there are no offsets for the connector present in the topic and
> there is no sentinel value...

This sounds like the worker will know that a sentinel value is expected,
but IIUC a worker cannot know that. Instead, the worker consuming the topic
must read to the end and expect _either_ a sentinel value _or_ committed
offsets. Can the text make this more clear?

Task count records
10. This section says "then workers will not bring up tasks for the
connector." What does this mean? Does it mean the tasks fail? How do users
recover from this? I think the "Addressed failure/degradation scenarios"
section tries to address some of

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-03-18 Thread Randall Hauch
Chris,

Thanks for the well-written KIP and the more recent updates. It's exciting
to envision EOS for Connect source connectors!

I'll follow up this email with another that goes into specific comments and
questions not already touched on by others. This email addresses comments
and questions previously discussed on this thread.

I'd like to +1 your response to Gwen's question about connector-specific
offsets topic (#2 in her email). But importantly, a connector can override
any producer settings, including Kafka credentials that don't have
permissions on the internal Connect offsets topic. For this reason, I think
it's necessary for this KIP to address those scenarios.

Gwen's #4 question was about how transactions related to the batches source
connector tasks return from their `poll()` method, and you followed up by
clarifying the document to make this more clear. IIUC, the KIP suggests
still using the existing `offset.flush.interval.ms` property and then
committing transactions based (solely?) upon the value of the property.
This would seem to make the transactions based upon time rather than record
count. Is that really the intent? Did you consider using one transaction
per "batch" of records, whatever number of records that means? One issue
with the time-based boundary is that `offset.flush.interval.ms` defaults to
60 seconds, and that could mean many hundreds of thousands of records on a
high throughput task. (BTW, I do agree with the KIP's rejected alternative
to not expose to the task or let the task define the transaction
boundaries.)

Gwen's #5 question was about rebalances of a source connector. Your answer
made sense, but while the KIP talks about source tasks being stopped before
the connector is rebalanced, I'm wondering about the delay or other impact
this has on the rebalance itself. The "Cannot fence out producers during
rebalance" section talks about fencing out producers, but IIUC this is not
the same as:

... all workers will preemptively stop all tasks of all source connectors
for which task configurations.


If this is already addressed in the KIP, please just point me to that
section.

Finally, regarding Guozhang's question about "partial rebalances" rather
than fencing out all producers. In your initial response, you said the
following:

Connectors can arbitrarily reassign source partitions (such as
database tables, or Kafka topic partitions) across tasks, so even if the
assignment of tasks across workers remains unchanged, the assignment of
source partitions across those workers might.


This is a very important point. In fact, there are definitely connectors
that shuffle around "source partition" assignments to different tasks, and
IMO we have to take that into consideration because Connect provides no
guidance on this and because there are good reasons to do this. So I'm glad
the KIP takes this into account. This is actually one of the issues I ran
into during an earlier attempt to POC EOS for source connectors, and the
ability to force fencing is a novel (albeit heavy-handed) way to deal with
that.

As I said, I'll follow up in subsequent emails on other specific questions
I have.

Best regards,

Randall

On Mon, Feb 22, 2021 at 6:45 PM Guozhang Wang  wrote:

> This is a great explanation, thank you!
>
> On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton  wrote:
>
> > Hi Guozhang,
> >
> > Your understanding should be correct in most cases, but there are two
> finer
> > points that may interest you:
> >
> > 1. It's technically dependent on the implementation of the connector and
> > how it chooses to allocate source partitions across its tasks; even if
> the
> > number of tasks and source partitions remains completely unchanged, a
> > connector may still choose to shuffle around the partition->task
> > allocation. I can't think of any cases where this might happen off the
> top
> > of my head, but it seemed worth sharing given the educational nature of
> the
> > question.
> > 2. It's also possible that the number of source partitions remains
> > unchanged, but the set of source partitions changes. One case where this
> > might happen is with a database connector monitoring for tables that
> match
> > a given regex every five minutes; if a table that matched that regex
> during
> > the last scan got assigned to a task and then dropped, and then another
> > table that matched the regex got added before the next scan, the
> connector
> > would see the same number of tables, but the actual set would be
> different.
> > At this point, it would again be connector-dependent for whether the
> > already-assigned tables stayed assigned to the same tasks. Is anyone else
> > reminded of the various consumer partition assignment strategies at this
> > point?
> >
> > A general comment I should make here (not necessarily for your benefit
> but
> > for anyone following along) is that it's important to keep in mind that
> > "source partitions" in Kafka Connect aren't Kafka topic partitions (well,
> >

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-22 Thread Guozhang Wang
This is a great explanation, thank you!

On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton  wrote:

> Hi Guozhang,
>
> Your understanding should be correct in most cases, but there are two finer
> points that may interest you:
>
> 1. It's technically dependent on the implementation of the connector and
> how it chooses to allocate source partitions across its tasks; even if the
> number of tasks and source partitions remains completely unchanged, a
> connector may still choose to shuffle around the partition->task
> allocation. I can't think of any cases where this might happen off the top
> of my head, but it seemed worth sharing given the educational nature of the
> question.
> 2. It's also possible that the number of source partitions remains
> unchanged, but the set of source partitions changes. One case where this
> might happen is with a database connector monitoring for tables that match
> a given regex every five minutes; if a table that matched that regex during
> the last scan got assigned to a task and then dropped, and then another
> table that matched the regex got added before the next scan, the connector
> would see the same number of tables, but the actual set would be different.
> At this point, it would again be connector-dependent for whether the
> already-assigned tables stayed assigned to the same tasks. Is anyone else
> reminded of the various consumer partition assignment strategies at this
> point?
>
> A general comment I should make here (not necessarily for your benefit but
> for anyone following along) is that it's important to keep in mind that
> "source partitions" in Kafka Connect aren't Kafka topic partitions (well,
> unless your connector is designed to replicate data across Kafka clusters
> like MirrorMaker 2). As a result, we have to rely on developer-written code
> to a greater extent to define what the source partitions for a source
> connector are, and how to divvy up that work amongst tasks.
>
> Hope this helps! If you have any further questions please hit me with them;
> I doubt you'll be the only one wondering about these things.
>
> Cheers,
>
> Chris
>
> On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang  wrote:
>
> > Thanks Chris, yeah I think I agree with you that this does not
> necessarily
> > have to be in the scope of this KIP.
> >
> > My understanding was that the source partitions -> tasks are not static
> but
> > dynamic, but they are only changed when either the number of partitions
> > changed or "tasks.max" config changed (please correct me if I'm wrong),
> so
> > what I'm thinking that we can try to detect if either of these things
> > happens, and if they do not happen we can assume the mapping from
> > partitions -> tasks does not change --- of course this requires some
> > extension on the API, aligned with what you said. I would like to make
> sure
> > that my understanding here is correct :)
> >
> > Guozhang
> >
> >
> > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton 
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks for taking a look, and for your suggestion!
> > >
> > > I think there is room for more intelligent fencing strategies, but I
> > think
> > > that it'd have to be more nuanced than one based on task->worker
> > > assignments. Connectors can arbitrarily reassign source partitions
> (such
> > as
> > > database tables, or Kafka topic partitions) across tasks, so even if
> the
> > > assignment of tasks across workers remains unchanged, the assignment of
> > > source partitions across those workers might. Connect doesn't do any
> > > inspection of task configurations at the moment, and without expansions
> > to
> > > the Connector/Task API, it'd likely be impossible to get information
> from
> > > tasks about their source partition assignments. With that in mind, I
> > think
> > > we may want to leave the door open for more intelligent task fencing
> but
> > > not include that level of optimization at this stage. Does that sound
> > fair
> > > to you?
> > >
> > > There is one case that I've identified where we can cheaply optimize
> > right
> > > now: single-task connectors, such as the Debezium CDC source
> connectors.
> > If
> > > a connector is configured at some point with a single task, then some
> > other
> > > part of its configuration is altered but the single-task aspect
> remains,
> > > the leader doesn't have to worry about fencing out the older task as
> the
> > > new task's producer will do that automatically. In this case, the
> leader
> > > can skip the producer fencing round and just write the new task count
> > > record straight to the config topic. I've added this case to the KIP;
> if
> > it
> > > overcomplicates things I'm happy to remove it, but seeing as it serves
> a
> > > real use case and comes fairly cheap, I figured it'd be best to include
> > > now.
> > >
> > > Thanks again for your feedback; if you have other thoughts I'd love to
> > hear
> > > them!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, Feb 22, 2021

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-22 Thread Chris Egerton
Hi Guozhang,

Your understanding should be correct in most cases, but there are two finer
points that may interest you:

1. It's technically dependent on the implementation of the connector and
how it chooses to allocate source partitions across its tasks; even if the
number of tasks and source partitions remains completely unchanged, a
connector may still choose to shuffle around the partition->task
allocation. I can't think of any cases where this might happen off the top
of my head, but it seemed worth sharing given the educational nature of the
question.
2. It's also possible that the number of source partitions remains
unchanged, but the set of source partitions changes. One case where this
might happen is with a database connector monitoring for tables that match
a given regex every five minutes; if a table that matched that regex during
the last scan got assigned to a task and then dropped, and then another
table that matched the regex got added before the next scan, the connector
would see the same number of tables, but the actual set would be different.
At this point, it would again be connector-dependent for whether the
already-assigned tables stayed assigned to the same tasks. Is anyone else
reminded of the various consumer partition assignment strategies at this
point?

A general comment I should make here (not necessarily for your benefit but
for anyone following along) is that it's important to keep in mind that
"source partitions" in Kafka Connect aren't Kafka topic partitions (well,
unless your connector is designed to replicate data across Kafka clusters
like MirrorMaker 2). As a result, we have to rely on developer-written code
to a greater extent to define what the source partitions for a source
connector are, and how to divvy up that work amongst tasks.

Hope this helps! If you have any further questions please hit me with them;
I doubt you'll be the only one wondering about these things.

Cheers,

Chris

On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang  wrote:

> Thanks Chris, yeah I think I agree with you that this does not necessarily
> have to be in the scope of this KIP.
>
> My understanding was that the source partitions -> tasks are not static but
> dynamic, but they are only changed when either the number of partitions
> changed or "tasks.max" config changed (please correct me if I'm wrong), so
> what I'm thinking that we can try to detect if either of these things
> happens, and if they do not happen we can assume the mapping from
> partitions -> tasks does not change --- of course this requires some
> extension on the API, aligned with what you said. I would like to make sure
> that my understanding here is correct :)
>
> Guozhang
>
>
> On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton 
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for taking a look, and for your suggestion!
> >
> > I think there is room for more intelligent fencing strategies, but I
> think
> > that it'd have to be more nuanced than one based on task->worker
> > assignments. Connectors can arbitrarily reassign source partitions (such
> as
> > database tables, or Kafka topic partitions) across tasks, so even if the
> > assignment of tasks across workers remains unchanged, the assignment of
> > source partitions across those workers might. Connect doesn't do any
> > inspection of task configurations at the moment, and without expansions
> to
> > the Connector/Task API, it'd likely be impossible to get information from
> > tasks about their source partition assignments. With that in mind, I
> think
> > we may want to leave the door open for more intelligent task fencing but
> > not include that level of optimization at this stage. Does that sound
> fair
> > to you?
> >
> > There is one case that I've identified where we can cheaply optimize
> right
> > now: single-task connectors, such as the Debezium CDC source connectors.
> If
> > a connector is configured at some point with a single task, then some
> other
> > part of its configuration is altered but the single-task aspect remains,
> > the leader doesn't have to worry about fencing out the older task as the
> > new task's producer will do that automatically. In this case, the leader
> > can skip the producer fencing round and just write the new task count
> > record straight to the config topic. I've added this case to the KIP; if
> it
> > overcomplicates things I'm happy to remove it, but seeing as it serves a
> > real use case and comes fairly cheap, I figured it'd be best to include
> > now.
> >
> > Thanks again for your feedback; if you have other thoughts I'd love to
> hear
> > them!
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton 
> wrote:
> >
> > > Hi Gwen,
> > >
> > > Thanks for the feedback!
> > >
> > > 0.
> > > That's a great point; I've updated the motivation section with that
> > > rationale.
> > >
> > > 1.
> > > This enables safe "hard downgrades" of clusters where, instead of just
> > > disabling exactly-once support o

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-22 Thread Guozhang Wang
Thanks Chris, yeah I think I agree with you that this does not necessarily
have to be in the scope of this KIP.

My understanding was that the source partitions -> tasks are not static but
dynamic, but they are only changed when either the number of partitions
changed or "tasks.max" config changed (please correct me if I'm wrong), so
what I'm thinking that we can try to detect if either of these things
happens, and if they do not happen we can assume the mapping from
partitions -> tasks does not change --- of course this requires some
extension on the API, aligned with what you said. I would like to make sure
that my understanding here is correct :)

Guozhang


On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton  wrote:

> Hi Guozhang,
>
> Thanks for taking a look, and for your suggestion!
>
> I think there is room for more intelligent fencing strategies, but I think
> that it'd have to be more nuanced than one based on task->worker
> assignments. Connectors can arbitrarily reassign source partitions (such as
> database tables, or Kafka topic partitions) across tasks, so even if the
> assignment of tasks across workers remains unchanged, the assignment of
> source partitions across those workers might. Connect doesn't do any
> inspection of task configurations at the moment, and without expansions to
> the Connector/Task API, it'd likely be impossible to get information from
> tasks about their source partition assignments. With that in mind, I think
> we may want to leave the door open for more intelligent task fencing but
> not include that level of optimization at this stage. Does that sound fair
> to you?
>
> There is one case that I've identified where we can cheaply optimize right
> now: single-task connectors, such as the Debezium CDC source connectors. If
> a connector is configured at some point with a single task, then some other
> part of its configuration is altered but the single-task aspect remains,
> the leader doesn't have to worry about fencing out the older task as the
> new task's producer will do that automatically. In this case, the leader
> can skip the producer fencing round and just write the new task count
> record straight to the config topic. I've added this case to the KIP; if it
> overcomplicates things I'm happy to remove it, but seeing as it serves a
> real use case and comes fairly cheap, I figured it'd be best to include
> now.
>
> Thanks again for your feedback; if you have other thoughts I'd love to hear
> them!
>
> Cheers,
>
> Chris
>
> On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton  wrote:
>
> > Hi Gwen,
> >
> > Thanks for the feedback!
> >
> > 0.
> > That's a great point; I've updated the motivation section with that
> > rationale.
> >
> > 1.
> > This enables safe "hard downgrades" of clusters where, instead of just
> > disabling exactly-once support on each worker, each worker is rolled back
> > to an earlier version of the Connect framework that doesn't support
> > per-connector offsets topics altogether. Those workers would go back to
> all
> > using a global offsets topic, and any offsets stored in per-connector
> > topics would be lost to those workers. This would cause a large number of
> > duplicates to flood the downstream system. While technically permissible
> > given that the user in this case will have knowingly switched to a
> version
> > of the Connect framework that doesn't support exactly-once source
> > connectors (and is therefore susceptible to duplicate delivery of
> records),
> > the user experience in this case could be pretty bad. A similar situation
> > is if users switch back from per-connector offsets topics to the global
> > offsets topic.
> > I've tried to make this more clear in the KIP by linking to the "Hard
> > downgrade" section from the proposed design, and by expanding on the
> > rationale provided for redundant global offset writes in the "Hard
> > downgrade" section. Let me know if you think this could be improved or
> > think a different approach is warranted.
> >
> > 2.
> > I think the biggest difference between Connect and Streams comes from the
> > fact that Connect allows users to create connectors that target different
> > Kafka clusters on the same worker. This hasn't been a problem in the past
> > because workers use two separate producers to write offset data and
> source
> > connector records, but in order to write offsets and records in the same
> > transaction, it becomes necessary to use a single producer, which also
> > requires that the internal offsets topic be hosted on the same Kafka
> > cluster that the connector is targeting.
> > This may sound like a niche use case but it was actually one of the
> > driving factors behind KIP-458 (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy
> ),
> > and it's a feature that we rely on heavily today.
> > If there's too much going on in this KIP and we'd prefer to drop support
> > for running that type of setup with exactly-

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-22 Thread Chris Egerton
Hi Guozhang,

Thanks for taking a look, and for your suggestion!

I think there is room for more intelligent fencing strategies, but I think
that it'd have to be more nuanced than one based on task->worker
assignments. Connectors can arbitrarily reassign source partitions (such as
database tables, or Kafka topic partitions) across tasks, so even if the
assignment of tasks across workers remains unchanged, the assignment of
source partitions across those workers might. Connect doesn't do any
inspection of task configurations at the moment, and without expansions to
the Connector/Task API, it'd likely be impossible to get information from
tasks about their source partition assignments. With that in mind, I think
we may want to leave the door open for more intelligent task fencing but
not include that level of optimization at this stage. Does that sound fair
to you?

There is one case that I've identified where we can cheaply optimize right
now: single-task connectors, such as the Debezium CDC source connectors. If
a connector is configured at some point with a single task, then some other
part of its configuration is altered but the single-task aspect remains,
the leader doesn't have to worry about fencing out the older task as the
new task's producer will do that automatically. In this case, the leader
can skip the producer fencing round and just write the new task count
record straight to the config topic. I've added this case to the KIP; if it
overcomplicates things I'm happy to remove it, but seeing as it serves a
real use case and comes fairly cheap, I figured it'd be best to include now.

Thanks again for your feedback; if you have other thoughts I'd love to hear
them!

Cheers,

Chris

On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton  wrote:

> Hi Gwen,
>
> Thanks for the feedback!
>
> 0.
> That's a great point; I've updated the motivation section with that
> rationale.
>
> 1.
> This enables safe "hard downgrades" of clusters where, instead of just
> disabling exactly-once support on each worker, each worker is rolled back
> to an earlier version of the Connect framework that doesn't support
> per-connector offsets topics altogether. Those workers would go back to all
> using a global offsets topic, and any offsets stored in per-connector
> topics would be lost to those workers. This would cause a large number of
> duplicates to flood the downstream system. While technically permissible
> given that the user in this case will have knowingly switched to a version
> of the Connect framework that doesn't support exactly-once source
> connectors (and is therefore susceptible to duplicate delivery of records),
> the user experience in this case could be pretty bad. A similar situation
> is if users switch back from per-connector offsets topics to the global
> offsets topic.
> I've tried to make this more clear in the KIP by linking to the "Hard
> downgrade" section from the proposed design, and by expanding on the
> rationale provided for redundant global offset writes in the "Hard
> downgrade" section. Let me know if you think this could be improved or
> think a different approach is warranted.
>
> 2.
> I think the biggest difference between Connect and Streams comes from the
> fact that Connect allows users to create connectors that target different
> Kafka clusters on the same worker. This hasn't been a problem in the past
> because workers use two separate producers to write offset data and source
> connector records, but in order to write offsets and records in the same
> transaction, it becomes necessary to use a single producer, which also
> requires that the internal offsets topic be hosted on the same Kafka
> cluster that the connector is targeting.
> This may sound like a niche use case but it was actually one of the
> driving factors behind KIP-458 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy),
> and it's a feature that we rely on heavily today.
> If there's too much going on in this KIP and we'd prefer to drop support
> for running that type of setup with exactly-once source connectors for now,
> I can propose this in a separate KIP. I figured it'd be best to get this
> out of the way with the initial introduction of exactly-once source support
> in order to make adoption by existing Connect users as seamless as
> possible, and since we'd likely have to address this issue before being
> able to utilize the feature ourselves.
> I switched around the ordering of the "motivation" section for
> per-connector offsets topics to put the biggest factor first, and called it
> out as the major difference between Connect and Streams in this case.
>
> 3.
> Fair enough, after giving it a little more thought I agree that allowing
> users to shoot themselves in the foot is a bad idea here. There's also some
> precedent for handling this with the "enable.idempotence" and "
> transactional.id" producer properties; if you specify a transactional ID
> but do

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-22 Thread Chris Egerton
Hi Gwen,

Thanks for the feedback!

0.
That's a great point; I've updated the motivation section with that
rationale.

1.
This enables safe "hard downgrades" of clusters where, instead of just
disabling exactly-once support on each worker, each worker is rolled back
to an earlier version of the Connect framework that doesn't support
per-connector offsets topics altogether. Those workers would go back to all
using a global offsets topic, and any offsets stored in per-connector
topics would be lost to those workers. This would cause a large number of
duplicates to flood the downstream system. While technically permissible
given that the user in this case will have knowingly switched to a version
of the Connect framework that doesn't support exactly-once source
connectors (and is therefore susceptible to duplicate delivery of records),
the user experience in this case could be pretty bad. A similar situation
is if users switch back from per-connector offsets topics to the global
offsets topic.
I've tried to make this more clear in the KIP by linking to the "Hard
downgrade" section from the proposed design, and by expanding on the
rationale provided for redundant global offset writes in the "Hard
downgrade" section. Let me know if you think this could be improved or
think a different approach is warranted.

2.
I think the biggest difference between Connect and Streams comes from the
fact that Connect allows users to create connectors that target different
Kafka clusters on the same worker. This hasn't been a problem in the past
because workers use two separate producers to write offset data and source
connector records, but in order to write offsets and records in the same
transaction, it becomes necessary to use a single producer, which also
requires that the internal offsets topic be hosted on the same Kafka
cluster that the connector is targeting.
This may sound like a niche use case but it was actually one of the driving
factors behind KIP-458 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy),
and it's a feature that we rely on heavily today.
If there's too much going on in this KIP and we'd prefer to drop support
for running that type of setup with exactly-once source connectors for now,
I can propose this in a separate KIP. I figured it'd be best to get this
out of the way with the initial introduction of exactly-once source support
in order to make adoption by existing Connect users as seamless as
possible, and since we'd likely have to address this issue before being
able to utilize the feature ourselves.
I switched around the ordering of the "motivation" section for
per-connector offsets topics to put the biggest factor first, and called it
out as the major difference between Connect and Streams in this case.

3.
Fair enough, after giving it a little more thought I agree that allowing
users to shoot themselves in the foot is a bad idea here. There's also some
precedent for handling this with the "enable.idempotence" and "
transactional.id" producer properties; if you specify a transactional ID
but don't specify a value for idempotence, the producer just does the right
thing for you by enabling idempotence and emitting a log message letting
you know that it's done so. I've adjusted the proposed behavior to try to
use a similar approach; let me know what you think.
There is the potential gap here where, sometime in the future, a third
accepted value for the "isolation.level" property is added to the consumer
API and users will be unable to use that new value for their worker. But
the likelihood of footgunning seems much greater than this scenario, and we
can always address expansions to the consumer API with changes to the
Connect framework as well if/when that becomes necessary.
I've also added a similar note to the source task's transactional ID
property; user overrides of it will also be disabled.

4.
Yeah, that's mostly correct. I tried to touch on this in the "Motivation"
section with this bit:
> The Connect framework periodically writes source task offsets to an
internal Kafka topic at a configurable interval, once the source records
that they correspond to have been successfully sent to Kafka.
I've expanded on this in the "Offset (and record) writes" section, and I've
tweaked the "Motivation" section a little bit to add a link to the relevant
config property and to make the language a little more accurate.

5.
This isn't quite as bad as stop-the-world; more like stop-the-connector. If
a worker is running a dozen connectors and one of those (that happens to be
a source) is reconfigured, only the tasks for that connector will be
preemptively halted, and all other tasks and connectors will continue
running. This is pretty close to the current behavior with incremental
rebalancing; the only difference is that, instead of waiting for the
rebalance to complete before halting the tasks for that connector, the
worker will halt them in preparation 

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-21 Thread Guozhang Wang
Hello Chris,

Thanks for the great write-up! I was mainly reviewing the admin
fenceProducers API of the KIP. I think it makes sense overall. I'm just
wondering if we can go one step further, that instead of forcing to fence
out all producers of the previous generation, could we try to achieve
"partial rebalance" still by first generate the new assignment, and then
based on the new assignment only fence out producers involved in tasks that
are indeed migrated? Just a wild thought to bring up for debate.

Guozhang



On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira  wrote:

> Hey Chris,
>
> Thank you for the proposal. Few questions / comments:
>
> 0. It may be worth pointing out in the motivation section that
> source-connector exactly once is more important than sink connector
> exactly once, since many target systems will have unique key
> constraint mechanisms that will prevent duplicates. Kafka does not
> have any such constraints, so without this KIP-618, exactly once won't
> be possible.
> 1. I am not clear why we need the worker to async copy offsets from
> the connector-specific offset topic to a global offsets topic
> 2. While the reasoning you have for offset topic per connector appears
> sound, it doesn't add up with the use of transactions in KafkaStreams.
> My understanding is that KafkaStreams uses shared offsets topic with
> all the other consumers, and (apparently) corrupting data and delays
> by other tenants is a non-issue. Perhaps you can comment on how
> Connect is different? In general much of the complexity in the KIP is
> related to the separate offset topic, and I'm wondering if this can be
> avoided. The migration use-case is interesting, but not related to
> exactly-once and can be handled separately.
> 3. Allowing users to override the isolation level for the offset
> reader, even when exactly-once is enabled, thereby disabling
> exactly-once in a non-obvious way. I get that connect usually allows
> users to shoot themselves in the foot, but are there any actual
> benefits for allowing it in this case? Maybe it is better if we don't?
> I don't find the argument that we always did this to be particularly
> compelling.
> 4. It isn't stated explicitly, but it sounds like connect or source
> connectors already have some batching mechanism, and that transaction
> boundaries will match the batches (i.e. each batch will be a
> transaction?). If so, worth being explicit.
> 5. "When a rebalance is triggered, before (re-)joining the cluster
> group, all workers will preemptively stop all tasks of all source
> connectors for which task configurations are present in the config
> topic after the latest task count record" - how will this play with
> the incremental rebalances? isn't this exactly the stop-the-world
> rebalance we want to avoid?
> 6. "the worker will instantiate a transactional producer whose
> transactional ID is, by default, the group ID of the cluster (but may
> be overwritten by users using the transactional.id worker property)" -
> If users change transactional.id property, zombie leaders won't get
> fenced (since they will have an older and different transactional id)
>
> Thanks,
>
> Gwen
>
> On Thu, May 21, 2020 at 11:21 PM Chris Egerton 
> wrote:
> >
> > Hi all,
> >
> > I know it's a busy time with the upcoming 2.6 release and I don't expect
> > this to get a lot of traction until that's done, but I've published a KIP
> > for allowing atomic commit of offsets and records for source connectors
> and
> > would appreciate your feedback:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> >
> > This feature should make it possible to implement source connectors with
> > exactly-once delivery guarantees, and even allow a wide range of existing
> > source connectors to provide exactly-once delivery guarantees with no
> > changes required.
> >
> > Cheers,
> >
> > Chris
>
>
>
> --
> Gwen Shapira
> Engineering Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-20 Thread Gwen Shapira
Hey Chris,

Thank you for the proposal. Few questions / comments:

0. It may be worth pointing out in the motivation section that
source-connector exactly once is more important than sink connector
exactly once, since many target systems will have unique key
constraint mechanisms that will prevent duplicates. Kafka does not
have any such constraints, so without this KIP-618, exactly once won't
be possible.
1. I am not clear why we need the worker to async copy offsets from
the connector-specific offset topic to a global offsets topic
2. While the reasoning you have for offset topic per connector appears
sound, it doesn't add up with the use of transactions in KafkaStreams.
My understanding is that KafkaStreams uses shared offsets topic with
all the other consumers, and (apparently) corrupting data and delays
by other tenants is a non-issue. Perhaps you can comment on how
Connect is different? In general much of the complexity in the KIP is
related to the separate offset topic, and I'm wondering if this can be
avoided. The migration use-case is interesting, but not related to
exactly-once and can be handled separately.
3. Allowing users to override the isolation level for the offset
reader, even when exactly-once is enabled, thereby disabling
exactly-once in a non-obvious way. I get that connect usually allows
users to shoot themselves in the foot, but are there any actual
benefits for allowing it in this case? Maybe it is better if we don't?
I don't find the argument that we always did this to be particularly
compelling.
4. It isn't stated explicitly, but it sounds like connect or source
connectors already have some batching mechanism, and that transaction
boundaries will match the batches (i.e. each batch will be a
transaction?). If so, worth being explicit.
5. "When a rebalance is triggered, before (re-)joining the cluster
group, all workers will preemptively stop all tasks of all source
connectors for which task configurations are present in the config
topic after the latest task count record" - how will this play with
the incremental rebalances? isn't this exactly the stop-the-world
rebalance we want to avoid?
6. "the worker will instantiate a transactional producer whose
transactional ID is, by default, the group ID of the cluster (but may
be overwritten by users using the transactional.id worker property)" -
If users change transactional.id property, zombie leaders won't get
fenced (since they will have an older and different transactional id)

Thanks,

Gwen

On Thu, May 21, 2020 at 11:21 PM Chris Egerton  wrote:
>
> Hi all,
>
> I know it's a busy time with the upcoming 2.6 release and I don't expect
> this to get a lot of traction until that's done, but I've published a KIP
> for allowing atomic commit of offsets and records for source connectors and
> would appreciate your feedback:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
>
> This feature should make it possible to implement source connectors with
> exactly-once delivery guarantees, and even allow a wide range of existing
> source connectors to provide exactly-once delivery guarantees with no
> changes required.
>
> Cheers,
>
> Chris



-- 
Gwen Shapira
Engineering Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-02 Thread Chris Egerton
Hi Jason,

Fine by me; I wanted to be conservative with the return type but the case
you've outlined sounds enticing enough that adding a little flexibility to
the API seems warranted. I've added your suggestion to the proposed admin
API expansions; let me know what you think.

Cheers,

Chris

On Mon, Feb 1, 2021 at 3:38 PM Jason Gustafson  wrote:

> Hi Chris,
>
> If we add the new `fenceProducers` admin API, can we return the information
> from the `InitProducerId` response (i.e. producer id and epoch)? We may not
> have a use case for it yet, but I don't see any harm exposing it for the
> future. For example, we could allow this state to be provided to the
> Producer instance on initialization, which would save the need for the
> second `InitProducerId` request in the current proposal. Also, the `Void`
> type does give us much room for extension.
>
> -Jason
>
>
> On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton  wrote:
>
> > Hi Ning,
> >
> > Apologies for the delay in response. I realized after publishing the KIP
> > that there were some finer points I hadn't considered in my design and
> that
> > it was far from providing exactly-once guarantees. In response to your
> > questions:
> >
> > 1) The goal of the KIP is to ensure the accuracy of the offsets that the
> > framework provides to source tasks; if tasks choose to manage offsets
> > outside of the framework, they're on their own. So, the source records
> and
> > their offsets will be written/committed to Kafka, and the task will be
> > provided them on startup, but it (or really, its predecessor) may not
> have
> > had time to do cleanup on resources associated with those records before
> > being killed.
> >
> > 2) I've cleaned up this section and removed the pseudocode as it seems
> too
> > low-level to be worth discussing in a KIP. I'll try to summarize here,
> > though: task.commit() is not what causes offsets provided to the
> framework
> > by tasks to be committed; it's simply a follow-up hook provided out of
> > convenience to tasks so that they can clean up resources associated with
> > the most recent batch of records (by ack'ing JMS messages, for example).
> > The Connect framework uses an internal Kafka topic to store source task
> > offsets.
> >
> > 3) In order to benefit from the improvements proposed in this KIP, yes,
> the
> > single source-of-truth should be the OffsetStorageReader provided to the
> > task by the Connect framework, at least at startup. After startup, tasks
> > should ideally bookkeep their own offset progress as each request to read
> > offsets requires a read to the end of the offsets topic, which can be
> > expensive in some cases.
> >
> > I've since expanded the KIP to include general exactly-once support for
> > source connectors that should cover the points I neglected in my initial
> > design, so it should be ready for review again.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang 
> > wrote:
> >
> > > Hello Chris,
> > >
> > > That is an interesting KIP. I have a couple of questions:
> > >
> > > (1) in section of pseudo-code, what if the failure happens between 4(b)
> > > and 5(a), meaning after the producer commit the transaction, and before
> > > task.commitRecord().
> > >
> > > (2) in section "source task life time",  what is the difference between
> > > "commit offset" and "offsets to commit"? Given that the offset storage
> > can
> > > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could
> only
> > > produce to a kafka topic, are / is the topic(s) the same ? (the topic
> > that
> > > producer writes offsets to and the topic task.commit() to)
> > >
> > > (3) for JDBC source task, it relies on `context.offsetStorageReader()`
> (
> > >
> >
> https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140
> > )
> > > to retrieve the previously committed offset (if from a fresh start or
> > > resume from failure). so it seems that the single-source-of-truth of
> > where
> > > to consume from last known / committed position stored in offset
> storage
> > > (e.g. kafka topic) managed by the periodic task.commit()?
> > >
> > > On 2020/05/22 06:20:51, Chris Egerton  wrote:
> > > > Hi all,
> > > >
> > > > I know it's a busy time with the upcoming 2.6 release and I don't
> > expect
> > > > this to get a lot of traction until that's done, but I've published a
> > KIP
> > > > for allowing atomic commit of offsets and records for source
> connectors
> > > and
> > > > would appreciate your feedback:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > > >
> > > > This feature should make it possible to implement source connectors
> > with
> > > > exactly-once delivery guarantees, and even allow a wide range of
> > existing
> > > > source connectors to provide exactly-once delivery guarantees with no
> > > > changes required

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-02-01 Thread Jason Gustafson
Hi Chris,

If we add the new `fenceProducers` admin API, can we return the information
from the `InitProducerId` response (i.e. producer id and epoch)? We may not
have a use case for it yet, but I don't see any harm exposing it for the
future. For example, we could allow this state to be provided to the
Producer instance on initialization, which would save the need for the
second `InitProducerId` request in the current proposal. Also, the `Void`
type does give us much room for extension.

-Jason


On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton  wrote:

> Hi Ning,
>
> Apologies for the delay in response. I realized after publishing the KIP
> that there were some finer points I hadn't considered in my design and that
> it was far from providing exactly-once guarantees. In response to your
> questions:
>
> 1) The goal of the KIP is to ensure the accuracy of the offsets that the
> framework provides to source tasks; if tasks choose to manage offsets
> outside of the framework, they're on their own. So, the source records and
> their offsets will be written/committed to Kafka, and the task will be
> provided them on startup, but it (or really, its predecessor) may not have
> had time to do cleanup on resources associated with those records before
> being killed.
>
> 2) I've cleaned up this section and removed the pseudocode as it seems too
> low-level to be worth discussing in a KIP. I'll try to summarize here,
> though: task.commit() is not what causes offsets provided to the framework
> by tasks to be committed; it's simply a follow-up hook provided out of
> convenience to tasks so that they can clean up resources associated with
> the most recent batch of records (by ack'ing JMS messages, for example).
> The Connect framework uses an internal Kafka topic to store source task
> offsets.
>
> 3) In order to benefit from the improvements proposed in this KIP, yes, the
> single source-of-truth should be the OffsetStorageReader provided to the
> task by the Connect framework, at least at startup. After startup, tasks
> should ideally bookkeep their own offset progress as each request to read
> offsets requires a read to the end of the offsets topic, which can be
> expensive in some cases.
>
> I've since expanded the KIP to include general exactly-once support for
> source connectors that should cover the points I neglected in my initial
> design, so it should be ready for review again.
>
> Cheers,
>
> Chris
>
> On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang 
> wrote:
>
> > Hello Chris,
> >
> > That is an interesting KIP. I have a couple of questions:
> >
> > (1) in section of pseudo-code, what if the failure happens between 4(b)
> > and 5(a), meaning after the producer commit the transaction, and before
> > task.commitRecord().
> >
> > (2) in section "source task life time",  what is the difference between
> > "commit offset" and "offsets to commit"? Given that the offset storage
> can
> > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could only
> > produce to a kafka topic, are / is the topic(s) the same ? (the topic
> that
> > producer writes offsets to and the topic task.commit() to)
> >
> > (3) for JDBC source task, it relies on `context.offsetStorageReader()` (
> >
> https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140
> )
> > to retrieve the previously committed offset (if from a fresh start or
> > resume from failure). so it seems that the single-source-of-truth of
> where
> > to consume from last known / committed position stored in offset storage
> > (e.g. kafka topic) managed by the periodic task.commit()?
> >
> > On 2020/05/22 06:20:51, Chris Egerton  wrote:
> > > Hi all,
> > >
> > > I know it's a busy time with the upcoming 2.6 release and I don't
> expect
> > > this to get a lot of traction until that's done, but I've published a
> KIP
> > > for allowing atomic commit of offsets and records for source connectors
> > and
> > > would appreciate your feedback:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > >
> > > This feature should make it possible to implement source connectors
> with
> > > exactly-once delivery guarantees, and even allow a wide range of
> existing
> > > source connectors to provide exactly-once delivery guarantees with no
> > > changes required.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> >
>


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2021-01-25 Thread Chris Egerton
Hi Ning,

Apologies for the delay in response. I realized after publishing the KIP
that there were some finer points I hadn't considered in my design and that
it was far from providing exactly-once guarantees. In response to your
questions:

1) The goal of the KIP is to ensure the accuracy of the offsets that the
framework provides to source tasks; if tasks choose to manage offsets
outside of the framework, they're on their own. So, the source records and
their offsets will be written/committed to Kafka, and the task will be
provided them on startup, but it (or really, its predecessor) may not have
had time to do cleanup on resources associated with those records before
being killed.

2) I've cleaned up this section and removed the pseudocode as it seems too
low-level to be worth discussing in a KIP. I'll try to summarize here,
though: task.commit() is not what causes offsets provided to the framework
by tasks to be committed; it's simply a follow-up hook provided out of
convenience to tasks so that they can clean up resources associated with
the most recent batch of records (by ack'ing JMS messages, for example).
The Connect framework uses an internal Kafka topic to store source task
offsets.

3) In order to benefit from the improvements proposed in this KIP, yes, the
single source-of-truth should be the OffsetStorageReader provided to the
task by the Connect framework, at least at startup. After startup, tasks
should ideally bookkeep their own offset progress as each request to read
offsets requires a read to the end of the offsets topic, which can be
expensive in some cases.

I've since expanded the KIP to include general exactly-once support for
source connectors that should cover the points I neglected in my initial
design, so it should be ready for review again.

Cheers,

Chris

On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang  wrote:

> Hello Chris,
>
> That is an interesting KIP. I have a couple of questions:
>
> (1) in section of pseudo-code, what if the failure happens between 4(b)
> and 5(a), meaning after the producer commit the transaction, and before
> task.commitRecord().
>
> (2) in section "source task life time",  what is the difference between
> "commit offset" and "offsets to commit"? Given that the offset storage can
> be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could only
> produce to a kafka topic, are / is the topic(s) the same ? (the topic that
> producer writes offsets to and the topic task.commit() to)
>
> (3) for JDBC source task, it relies on `context.offsetStorageReader()` (
> https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140)
> to retrieve the previously committed offset (if from a fresh start or
> resume from failure). so it seems that the single-source-of-truth of where
> to consume from last known / committed position stored in offset storage
> (e.g. kafka topic) managed by the periodic task.commit()?
>
> On 2020/05/22 06:20:51, Chris Egerton  wrote:
> > Hi all,
> >
> > I know it's a busy time with the upcoming 2.6 release and I don't expect
> > this to get a lot of traction until that's done, but I've published a KIP
> > for allowing atomic commit of offsets and records for source connectors
> and
> > would appreciate your feedback:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> >
> > This feature should make it possible to implement source connectors with
> > exactly-once delivery guarantees, and even allow a wide range of existing
> > source connectors to provide exactly-once delivery guarantees with no
> > changes required.
> >
> > Cheers,
> >
> > Chris
> >
>


Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2020-07-27 Thread Ning Zhang
Hello Chris,

That is an interesting KIP. I have a couple of questions:

(1) in section of pseudo-code, what if the failure happens between 4(b) and 
5(a), meaning after the producer commit the transaction, and before 
task.commitRecord().

(2) in section "source task life time",  what is the difference between "commit 
offset" and "offsets to commit"? Given that the offset storage can be a Kafka 
topic (/KafkaOffsetBackingStore.java) and producer could only produce to a 
kafka topic, are / is the topic(s) the same ? (the topic that producer writes 
offsets to and the topic task.commit() to)

(3) for JDBC source task, it relies on `context.offsetStorageReader()` 
(https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140)
 to retrieve the previously committed offset (if from a fresh start or resume 
from failure). so it seems that the single-source-of-truth of where to consume 
from last known / committed position stored in offset storage (e.g. kafka 
topic) managed by the periodic task.commit()?

On 2020/05/22 06:20:51, Chris Egerton  wrote: 
> Hi all,
> 
> I know it's a busy time with the upcoming 2.6 release and I don't expect
> this to get a lot of traction until that's done, but I've published a KIP
> for allowing atomic commit of offsets and records for source connectors and
> would appreciate your feedback:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> 
> This feature should make it possible to implement source connectors with
> exactly-once delivery guarantees, and even allow a wide range of existing
> source connectors to provide exactly-once delivery guarantees with no
> changes required.
> 
> Cheers,
> 
> Chris
> 


[DISCUSS] KIP-618: Atomic commit of source connector records and offsets

2020-05-21 Thread Chris Egerton
Hi all,

I know it's a busy time with the upcoming 2.6 release and I don't expect
this to get a lot of traction until that's done, but I've published a KIP
for allowing atomic commit of offsets and records for source connectors and
would appreciate your feedback:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets

This feature should make it possible to implement source connectors with
exactly-once delivery guarantees, and even allow a wide range of existing
source connectors to provide exactly-once delivery guarantees with no
changes required.

Cheers,

Chris