RE transaction boundaries:
> > First, the offset commit interval is currently 60 seconds by default, and
> source connectors can produce a lot of records in that time. Until someone
> operating a Connect cluster changed that to a much lower value, it's
> possible that source connector performance could be significantly impacted.

At first blush,
> controlling transaction boundaries based upon batches seems like it would
> work better for high throughput connectors.
> Sorry, what's the concern about performance here? Write throughput for a
> transactional producer doesn't suffer as the number of records per
> transaction increases. I'm also worried about source connectors that return
> small batches with high frequency; those would not fare well with a lower
offset commit interval.

Recall that with Kafka transactions, a consumer in read_committed mode
buffers messages that are part of a transaction until that transaction is
committed or aborted. When a consumer sees an aborted transaction marker
for that transaction, the consumer discards all buffered messages
associated with that transaction. When a consumer sees a commit transaction
marker, it then forwards those buffered messages that are associated with
that transaction to its application. Note the transaction markers and
buffered messages are per topic partition.

So I have two concerns with using time-based transaction boundaries.

The first is easier to understand: consumers using read_committed mode may
consume a lot of memory while buffering records that are part of active
transactions. In fact, the memory required is a function of the number of
records in the transaction in the assigned topic partitions. However, the
only way a Connect user can control the number of records in each
transaction is by reducing the offset commit interval used for _all
connectors_. This means that the Connect user cannot directly limit the
size of the transaction (and therefore the buffering memory required by the
consumers) but can only limit the maximum duration of the transactions.
This seems challenging at best, because the size of the transaction is a
function of the throughput and the offset commit interval.

My second concern is that applications consuming the topics to which a
source connector writes will perceive those writes as having a potentially
very high lag w/r/t the connector first seeing in the source system the
information for those records. The best case is that the source connector
discovers some information, generates a source record and hands it to
Connect, and Connect writes that record to the topic just before the offset
commit window closes and commits the transaction. A worse case is that the
source connector gives the record to Connect just after the offset commit
window closes (and the transaction is closed), and the transaction with
that record will not be committed for another commit interval. This means
the worst case *perceived lag* could be at least the offset commit interval.

So if we agree that it will be difficult for some Connect users to choose a
one-size-fits-all offset commit interval to work well for source connectors
with a range of throughputs and different consumer application
requirements, we may need to consider the ability for each connector to
control the transaction boundaries, albeit still somewhat indirectly. The
question then becomes how to specify the boundaries. Record counts alone
are insufficient, since a transaction could be 1 record short for some
time, resulting in the entire transaction timing out. Time/duration is also
not sufficient, since for low perceived lag this should be set as small as
is feasible (and thus poor for high throughput).

Using a separate transaction for each batch seems like a very worthwhile
compromise. First, it reuses something that connector implementations
already employ, and which many connectors have configs to tune the behavior
to maximize throughput. Second, it requires no additional configuration
properties or connector-specific behavior. Third, high-throughput
connectors are likely returning large batches, so write amplification for
the extra offset record per source partition is likely to have a much
smaller impact (assuming that most high throughput connectors have high
records-per-source-partition ratios). Likewise, low-throughput connectors
will either have very small or infrequent batches that will easily handle
the higher write amplification (up to 2x: one offset record for every
record). Regardless of the throughput, infrequent batches would have higher
lag anyway even without EOS, and EOS transactions coupled to those batches
would add minimum overhead.

It's true we already have
> that constraint now, but transactions and more importantly transaction size
> and latency add a completely different aspect to the calculation of how to
> set the offset commit interval for a Connect cluster.

> > If we're committing transactions every 60 seconds (or even as frequently
> as
> every 5 seconds), then the _perceived lag_ will be significantly higher
> with transactions than without.
> That's a fair point. High-throughput connectors are likely to benefit from
> higher commit intervals so that they have to pause to commit offsets, and
> produce offset records, less frequently. Low-latency connectors are likely
> to benefit from lower commit intervals in order to shorten the time between
> source records being produced to Kafka and their transactions being
> committed. There's no reason to assume that there's a one-size-fits-all
> offset commit interval for an entire cluster once offset commit becomes a
> requirement for records to be visible to downstream consumers.
> > Have you considered having the worker create a separate transaction for
> each batch of records returned by the connector?
> I had not considered this, but for reasons outlined above regarding
> performance with high-throughput connectors, I'm hesitant to put it into
> play by default.
> Given the conflicting requirements of high-throughput and low-latency
> connectors with regards to offset commit, I do agree that for some
> connectors, the most useful default behavior would be to commit offsets as
> soon as possible, and committing once for every batch returned from
> "SourceTask::poll" is a great way to allow that.
> The most straightforward way to accommodate these conflicting cases seems
> to be to just go ahead and allow per-connector offset commit intervals.
> That way, the user gets to define an approximate upper bound on the time
> between a record being produced and its transaction being committed--and if
> they want the upper bound to be zero, they can just configure their
connector with an offset commit interval of zero. Thoughts?

Yes, per-connector offset commit intervals is one approach that would be
more explicit, though see my concerns earlier in this email about
controlling the number of records in a transaction by a time-based config,
even if set on a per-connector basis.

> > It is true that low-throughput
> connectors would result in higher write amplification (worst case being 2x
> when each batch returns a single record), but for low-throughput connectors
> this seems like this might be an acceptable tradeoff if EOS is really
> needed, or if not then EOS can be disabled for this connector.
> Just to be clear--disabling exactly-once support for a connector is
> technically an option but would require setting up a dedicated
> "non-exactly-once" Connect cluster, since exactly-once will only be
> configurable on a per-worker level and not a per-connector level.

I'm not sure that setting up a separate Connect cluster is that practical
for many Connect users, especially in larger organizations where one group
manages the cluster and others manage connectors.

RE other comments:
> > This seems to contradict the earlier quote. Specifically, the first quote
> above states that transaction boundaries are dictated by offset flushes,
> which is controlled by the `offset.flush.interval.ms` property. However,
> the second quote says the `offset.flush.timeout.ms` property will be
> ignored for EOS source tasks. I must be missing something.
> One of these discusses the offset commit interval property (
> https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.interval.ms
> ),
> and the other discusses the offset commit timeout property (
> https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms
> ).
> They are separate properties and control separate things; hope this helps.
Okay, that makes sense. But boy is that subtle.

> > Does it seem realistic or practical to suggest to reduce the source
> connector's throughput?
> I believe so, yes. Obviously this option should be reserved for last, but
> since the penalty for failure here is that the task is rendered completely
> unusable, a slower task seems preferable to a failed one. I've updated this
> section to both add two more alternatives (of tuning the producer for
> higher throughput and using a smaller offset commit interval) and to
> reorder things in descending order of preference.

Reducing throughput is a technical solution, especially if the only
alternative is that the task doesn't run. But IMO it's an impractical
technical solution as most users will want a real-time solution, and
decreasing connector's throughput is the opposite of this.

The KIP says: "It may take longer than the transaction timeout for a task
to flush all of its records to Kafka." Might another option be to reduce
the number of records in the transaction?
Is it possible to eliminate transaction timeouts by reducing the size of
the transactions?

> I should also make it clear that this should occur very rarely in a setup
> with a reasonably-high transaction timeout. Today, if a task's producer is
> unable to keep up with the throughput of the records generated by the task,
> this leads to a situation where it becomes impossible to commit offsets for
> that task (due to https://issues.apache.org/jira/browse/KAFKA-12226); in
> practice, I have never seen this happen with a healthy Kafka cluster and a
> sufficiently-provisioned (enough tasks, uncrowded workers, etc.) connector.
> > Do you mean to use "task A" and "task B" here? Do they imply tasks from
> the
> same connector?
> The scenario outlined here applies for both cases: the tasks could be from
> the same connector, or from different connectors.
> > If so, then won't the offsets from both of those tasks
> still be written to the same connector-specific offset topic, and suffer
> from the potential blocking issue mentioned above? While it's useful to
> call this out, I'm not sure how that example helps support the motivation
> for a separate per-connector offset topic. OTOH, if these were tasks from
> _different_ connectors, then it becomes clear that the offsets from one
> source connector using a connector-specific offsets topic will never block
> the offsets from another connector using a different connector-specific
> offsets topic. Thus, having each connector use separate connector-specific
> offset topics at least avoids the problem of one connector's tasks blocking
> the tasks of the other connector just because of transaction commit timeout
> issues.
> I believe this is already touched on in the third paragraph of the
> "Per-connector offsets topics"/"Motivation" section:
> > This way, although tasks of the same connector may still interfere with
> each other, they will at least not interfere with tasks of other
> connectors.

> > It's implied but not explicitly stated that this will result in
> duplicating
> the offsets for EOS source connectors: the workers will continue to track
> all source offsets in its own offsets topic, and EOS-enabled source
> connectors will also track their own offsets in connector-specific offsets
> topics. It might be worth making that more obvious, and which will be used
> upon connector restarts.
> I believe the duplication is already touched on in the "Offset (and record)
> writes" section:
> > Once an offset commit is complete, if the connector is (implicitly or
> explicitly) configured with a separate offsets topic, the committed offsets
> will also be written to the worker’s global offsets topic using a
> non-transactional producer and the worker’s principal.
> And the precedence of the various topics is described in detail in the
> "Per-connector offsets topics"/"Smooth migration" section (TL;DR: if an
> offset is present in a connector's separate offsets topic, it's used; if
> it's not found, then the global offsets topic is searched as a fallback).
> > Are you suggesting that the source connector and task implementations be
> responsible for this? Or are you suggesting that the worker will be
> responsible for creating the offsets topics on behalf of source connector
> and task implementations (using an admin client per the configurations),
> just like the worker currently does so for the cluster's offsets topic?
> Please clarify this in the KIP.
> Sure, done.
> > Do you think it's worthwhile adding a parenthetical clause after this
> statement that says "(either `exactly.once.source.enabled=false` or an
> older Connect worker that does not have EOS support for source
> connectors)"? Yes, those are called out above, but IMO this would help
> clarify exactly what this paragraph means for operators -- and how it is
> not a burden for them to avoid this condition.
> Sure, done.
> > In the "Connector principal permissions" section, the "IdempotentWrite"
> row
> says "Kafka cluster targeted by the Connect cluster" for the resource name
> column. Is this right? Should that instead be "Kafka cluster to which the
> Connector writes"?
> Good catch, done.
> > In the "Rolling upgrade(s)" section, should the first and second rolling
> upgrades be completed before enabling EOS on any source connector? If so,
> should this section call this out in more detail?
> I'm not sure I understand the question? Exactly-once source support is only
> configurable on a per-worker basis and, when enabled, will apply to all
> connectors. So the steps provided in the "Rolling upgrade(s)" section are
> sufficient to get every connector on your cluster running with exactly-once
> enabled, and there is no need or way to enable exactly-once on a
> per-connector basis.
> > How does this work with a rolling "downgrade"? The "Heterogeneous
> clusters"
> subsection of the "Permitted failure scenarios" section talks about (I
> think) how setting `exactly.once.source.enabled=false` for one worker
> effectively breaks the EOS delivery guarantees for the whole Connect
> cluster. If that is the case, how does this rolling downgrade work? Would
> it simply require that all source connectors configured to use EOS be
> changed to not use EOS, and then to do a rolling downgrade of the worker's
> configuration to disable EOS? If this is the case, let's be more explicit
> in this section on downgrading.
> > In the "Soft downgrade" section, it sounds like this is effectively not a
> rolling downgrade. Nothing in the "Downgrades" section really implies that
> rolling downgrades are not supported, but at the same time it's not
> terribly obvious.
> Overall, it might be worth considering (if you haven't already) modifying
> the Downgrade section to be a bit more prescriptive in terms of steps that
> should be taken.
> I've touched up the sections that touch on downgrades, to make it clear how
> to safely perform a rolling downgrade for both "hard" and "soft"
> downgrades, and what the consequences of performing an unsafe rolling
> "hard" downgrade are and how to deal with the consequences. Hopefully this,
> coupled with the knowledge that exactly-once is not going to be
> configurable on a per-connector basis, should make things clear.
> > Finally, do we need to expose any metrics that are related to maybe the
> average number of records in each transaction? Or should we rely solely
> upon existing producer metrics?
> That's a great point--after perusing
> https://kafka.apache.org/28/documentation.html#producer_monitoring it
> doesn't look like there are any metrics on producer transaction size. I've
> updated the KIP to add source task metrics for minimum, maximum, and
> average transaction size. I think that this, in conjunction with existing
> metrics for poll time, commit time, batch size (per call to
> "SourceTask::poll"), and active record count, should cover our bases.
Thanks again!


