Hi Chris,

Just a minor question: I can see why the default for exactly.once.support
is requested (you want a good first-run experience, I assume), but it's a
little like engineering a safety catch and then not enabling it. Wouldn't
it be safer to default to required, so that there's no way someone can
mistakenly not get EoS without explicitly having configured it?

Thanks,

Tom

On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton <chr...@confluent.io.invalid>
wrote:

> Hi Gunnar,
>
> Thanks for taking a look! I've addressed the low-hanging fruit in the KIP;
> responses to other comments inline here:
>
> > * TransactionContext: What's the use case for the methods accepting a
> source record (commitTransaction(SourceRecord
> record), abortTransaction(SourceRecord record))?
>
> This allows developers to decouple transaction boundaries from record
> batches. If a connector has a configuration that dictates how often it
> returns from "SourceTask::poll", for example, it may be easier to define
> multiple transactions within a single batch or a single transaction across
> several batches than to retrofit the connector's poll logic to work with
> transaction boundaries.
>
> > * SourceTaskContext: Instead of guarding against NSME, is there a way for
> a
> connector to query the KC version and thus derive its capabilities? Going
> forward, a generic API for querying capabilities could be nice, so a
> connector can query for capabilities of the runtime in a safe and
> compatible way.
>
> This would be a great quality-of-life improvement for connector and
> framework developers alike, but I think it may be best left for a separate
> KIP. The current approach, clunky though it may be, seems like a nuisance
> at worst. It's definitely worth addressing but I'm not sure we have the
> time to think through all the details thoroughly enough in time for the
> upcoming KIP freeze.
>
> > * SourceConnector: Would it make sense to merge the two methods perhaps
> and
> return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }?
>
> Hmm... at first glance I like the idea of merging the two methods a lot.
> The one thing that gives me pause is that there may be connectors that
> would like to define their own transaction boundaries without providing
> exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> accommodate that, but then, it might actually be simpler to keep the two
> methods separate in case we add some third variable to the mix that would
> also have to be reflected in the possible ExactlyOnceSupport enum values.
>
> > Or, alternatively return an enum from canDefineTransactionBoundaries(),
> too; even if it only has two values now, that'd allow for extension in the
> future
>
> This is fine by me; we just have to figure out exactly which enum values
> would be suitable. It's a little clunky but right now I'm toying with
> something like "ConnectorDefinedTransactionBoundaries" with values of
> "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
> need more granularity in the future then we can deprecate one or both of
> them and add new values. Thoughts?
>
> > And one general question: in Debezium, we have some connectors that
> produce
> records "out-of-bands" to a schema history topic via their own custom
> producer. Is there any way envisionable where such a producer would
> participate in the transaction managed by the KC runtime environment?
>
> To answer the question exactly as asked: no; transactions cannot be shared
> across producers and until/unless that is changed (which seems unlikely)
> this won't be possible. However, I'm curious why a source connector would
> spin up its own producer instead of using "SourceTask::poll" to provide
> records to Connect. Is it easier to consume from that topic when the
> connector can define its own (de)serialization format? I'm optimistic that
> if we understand the use case for the separate producer we may still be
> able to help bridge the gap here, one way or another.
>
> > One follow-up question after thinking some more about this; is there any
> limit in terms of duration or size of in-flight, connector-controlled
> transactions? In case of Debezium for instance, there may be cases where we
> tail the TX log from an upstream source database, not knowing whether the
> events we receive belong to a committed or aborted transaction. Would it be
> valid to emit all these events via a transactional task, and in case we
> receive a ROLLBACK event eventually, to abort the pending Kafka
> transaction? Such source transactions could be running for a long time
> potentially, e.g. hours or days (at least in theory). Or would this sort of
> usage not be considered a reasonable one?
>
> I think the distinction between reasonable and unreasonable usage here is
> likely dependent on use cases that people are trying to satisfy with their
> connector, but if I had to guess, I'd say that a different approach is
> probably warranted in most cases if the transaction spans across entire
> days at a time. If there's no concern about data not being visible to
> downstream consumers until its transaction is committed, and the number of
> records in the transaction isn't so large that the amount of memory
> required to buffer them all locally on a consumer before delivering them to
> the downstream application is reasonable, it would technically be possible
> though. Connect users would have to be mindful of the following:
>
> - A separate offsets topic for the connector would be highly recommended in
> order to avoid crippling other connectors with hanging transactions
> - The producer-level transaction.timeout.ms property (
>
> https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms
> ),
> which can be configured in connectors either via the worker-level
> producer.transaction.timeout.ms or connector-level
> producer.override.transaction.timeout.ms property, would have to be high
> enough to allow for transactions that stay open for long periods of time
> (the default is 1 minute, so this would almost certainly have to be
> adjusted)
> - The broker-level transaction.max.timeout.ms property (
>
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms
> )
> would have to be at least as high as the transaction timeout necessary for
> the task (default is 15 minutes, so this would probably need to be
> adjusted)
> - The broker-level transactional.id.expiration.ms property (
>
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms
> )
> would have to be high enough to not automatically expire the task's
> producer if there was a long enough period without new records; default is
> 7 days, so this would probably be fine in most scenarios
>
> Thanks again for taking a look; insight from connector developers is
> tremendously valuable here!
>
> Cheers,
>
> Chris
>
> On Thu, May 27, 2021 at 6:35 PM Gunnar Morling
> <gunnar.morl...@googlemail.com.invalid> wrote:
>
> > Chris,
> >
> > One follow-up question after thinking some more about this; is there any
> > limit in terms of duration or size of in-flight, connector-controlled
> > transactions? In case of Debezium for instance, there may be cases where
> we
> > tail the TX log from an upstream source database, not knowing whether the
> > events we receive belong to a committed or aborted transaction. Would it
> be
> > valid to emit all these events via a transactional task, and in case we
> > receive a ROLLBACK event eventually, to abort the pending Kafka
> > transaction? Such source transactions could be running for a long time
> > potentially, e.g. hours or days (at least in theory). Or would this sort
> of
> > usage not be considered a reasonable one?
> >
> > Thanks,
> >
> > --Gunnar
> >
> >
> > Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
> > gunnar.morl...@googlemail.com>:
> >
> > > Chris, all,
> > >
> > > I've just read KIP-618, and let me congratulate you first of all for
> this
> > > impressive piece of work! Here's a few small suggestions and questions
> I
> > > had while reading:
> > >
> > > * TransactionContext: What's the use case for the methods accepting a
> > > source record (commitTransaction(SourceRecord
> > > record), abortTransaction(SourceRecord record))?
> > > * SourceTaskContext: Typo in "when the sink connector is deployed" ->
> > > source task
> > > * SourceTaskContext: Instead of guarding against NSME, is there a way
> for
> > > a connector to query the KC version and thus derive its capabilities?
> > Going
> > > forward, a generic API for querying capabilities could be nice, so a
> > > connector can query for capabilities of the runtime in a safe and
> > > compatible way.
> > > * SourceConnector: exactlyOnceSupport() -> false return value doesn't
> > match
> > > * SourceConnector: Would it make sense to merge the two methods perhaps
> > > and return one enum of { SUPPORTED, NOT_SUPPORTED,
> > > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> > > from canDefineTransactionBoundaries(), too; even if it only has two
> > values
> > > now, that'd allow for extension in the future
> > >
> > > And one general question: in Debezium, we have some connectors that
> > > produce records "out-of-bands" to a schema history topic via their own
> > > custom producer. Is there any way envisionable where such a producer
> > would
> > > participate in the transaction managed by the KC runtime environment?
> > >
> > > Thanks a lot,
> > >
> > > --Gunnar
> > >
> > >
> > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> > > <chr...@confluent.io.invalid>:
> > >
> > >> Hi all,
> > >>
> > >> Wanted to note here that I've updated the KIP document to include the
> > >> changes discussed recently. They're mostly located in the "Public
> > >> Interfaces" section. I suspect discussion hasn't concluded yet and
> there
> > >> will probably be a few more changes to come, but wanted to take the
> > >> opportunity to provide a snapshot of what the current design looks
> like.
> > >>
> > >> Cheers,
> > >>
> > >> Chris
> > >>
> > >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton <chr...@confluent.io>
> > >> wrote:
> > >>
> > >> > Hi Tom,
> > >> >
> > >> > Wow, I was way off base! I was thinking that the intent of the
> > fencible
> > >> > producer was to employ it by default with 3.0, as opposed to only
> > after
> > >> the
> > >> > worker-level
> > >> > "exactly.once.source.enabled" property was flipped on. You are
> correct
> > >> > that with the case you were actually describing, there would be no
> > >> > heightened ACL requirements, and that it would leave room in the
> > future
> > >> for
> > >> > exactly-once to be disabled on a per-connector basis (as long as all
> > the
> > >> > workers in the cluster already had "exactly.once.source.enabled" set
> > to
> > >> > "true") with no worries about breaking changes.
> > >> >
> > >> > I agree that this is something for another KIP; even if we could
> > squeeze
> > >> > it in in time for this release, it might be a bit much for new users
> > to
> > >> > take in all at once. But I can add it to the doc as "future work"
> > since
> > >> > it's a promising idea that could prove valuable to someone who might
> > >> need
> > >> > per-connector granularity in the future.
> > >> >
> > >> > Thanks for clearing things up; in retrospect your comments make a
> lot
> > >> more
> > >> > sense now, and I hope I've sufficiently addressed them by now.
> > >> >
> > >> > PSA for you and everyone else--I plan on updating the doc next week
> > with
> > >> > the new APIs for connector-defined transaction boundaries,
> > >> > user-configurable transaction boundaries (i.e., poll vs. interval
> vs.
> > >> > connectors), and preflight checks for exactly-once validation
> > (required
> > >> vs.
> > >> > requested).
> > >> >
> > >> > Cheers,
> > >> >
> > >> > Chris
> > >> >
> > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley <tbent...@redhat.com>
> > >> wrote:
> > >> >
> > >> >> Hi Chris,
> > >> >>
> > >> >> Thanks for continuing to entertain some of these ideas.
> > >> >>
> > >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
> > >> <chr...@confluent.io.invalid
> > >> >> >
> > >> >> wrote:
> > >> >>
> > >> >> > [...]
> > >> >> >
> > >> >> That's true, but we do go from three static ACLs (write/describe
> on a
> > >> >> fixed
> > >> >> > transactional ID, and idempotent write on a fixed cluster) to a
> > >> dynamic
> > >> >> > collection of ACLs.
> > >> >> >
> > >> >>
> > >> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I
> > was
> > >> >> suggesting the use of a 'fencing producer' only in clusters with
> > >> >> exactly.once.source.enabled=true where I imagined the key
> difference
> > >> >> between the exactly once and fencing cases was how the producer was
> > >> >> configured/used (transactional vs this new fencing semantic). I
> think
> > >> the
> > >> >> ACL requirements for connector producer principals would therefore
> be
> > >> the
> > >> >> same as currently described in the KIP. The same is true for the
> > worker
> > >> >> principals (which is the only breaking change you give in the KIP).
> > So
> > >> I
> > >> >> don't think the fencing idea changes the backwards compatibility
> > story
> > >> >> that's already in the KIP, just allows a safe per-connector
> > >> >> exactly.once=disabled option to be supported (with required as
> > >> requested
> > >> >> as
> > >> >> we already discussed).
> > >> >>
> > >> >> But I'm wondering whether I've overlooked something.
> > >> >>
> > >> >> Ultimately I think it may behoove us to err on the side of reducing
> > the
> > >> >> > breaking changes here for now and saving them for 4.0 (or some
> > later
> > >> >> major
> > >> >> > release), but would be interested in thoughts from you and
> others.
> > >> >> >
> > >> >>
> > >> >> Difficult to answer (given I think I might be missing something).
> > >> >> If there are breaking changes then I don't disagree. It's difficult
> > to
> > >> >> reason about big changes like this without some practical
> experience.
> > >> >> If there are not, then I think we could also implement the whole
> > >> >> exactly.once=disabled thing in a later KIP without additional
> > breaking
> > >> >> changes (i.e. some time in 3.x), right?
> > >> >>
> > >> >>
> > >> >> > > Gouzhang also has a (possible) use case for a fencing-only
> > >> producer (
> > >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as he
> > points
> > >> >> out
> > >> >> > there, you should be able to get these semantics today by calling
> > >> >> > initTransactions() and then just using the producer as normal (no
> > >> >> > beginTransaction()/abortTransaction()/endTransaction()).
> > >> >> >
> > >> >> > I tested this locally and was not met with success; transactional
> > >> >> producers
> > >> >> > do a check right now to ensure that any calls to
> > >> "KafkaProducer::send"
> > >> >> > occur within a transaction (see
> > >> >> >
> > >> >> >
> > >> >>
> > >>
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> > >> >> > and
> > >> >> >
> > >> >> >
> > >> >>
> > >>
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> > >> >> > ).
> > >> >> > Not a blocker, just noting that we'd have to do some legwork to
> > make
> > >> >> this
> > >> >> > workable with the producer API.
> > >> >> >
> > >> >>
> > >> >> Ah, sorry, I should have actually tried it rather than just taking
> a
> > >> quick
> > >> >> look at the code.
> > >> >>
> > >> >> Rather than remove those safety checks I suppose we'd need a way of
> > >> >> distinguishing, in the config, the difference in semantics. E.g.
> > >> Something
> > >> >> like a fencing.id config, which was mutually exclusive with
> > >> >> transactional.id.
> > >> >> Likewise perhaps initFencing() alongside initTransactions() in the
> > API.
> > >> >> But
> > >> >> I think at this point it's something for another KIP.
> > >> >>
> > >> >> Kind regards,
> > >> >>
> > >> >> Tom
> > >> >>
> > >> >
> > >>
> > >
> >
>

Reply via email to