Hi Sagar,

Thanks for updating the KIP! The latest draft seems simpler and more
focused, which I think is a win for users and developers alike. Here are my
thoughts on the current draft:

1. (Nit) Can we move the "Public Interfaces" section before the "Proposed
Changes" section? It's nice to have a summary of the user/developer-facing
changes first since that answers many of the questions that I had while
reading the "Proposed Changes" section. I'd bet that this is also why we
use that ordering in the KIP template.

2. Why are we invoking SourceTask::updateOffsets so frequently when
exactly-once support is disabled? Wouldn't it be simpler both for our
implementation and for connector developers if we only invoked it directly
before committing offsets, instead of potentially several times between
offset commits, especially since that would also mirror the behavior with
exactly-once support enabled?

3. Building off of point 2, we wouldn't need to specify any more detail
than that "SourceTask::updateOffsets will be invoked directly before
committing offsets, with the to-be-committed offsets". There would be no
need to distinguish between when exactly-once support is enabled or
disabled.

4. Some general stylistic feedback: we shouldn't mention the names of
internal classes or methods in KIPs. KIPS are for discussing high-level
design proposals. Internal names and APIS may change over time, and are not
very helpful to readers who are not already familiar with the code base.
Instead, we should describe changes in behavior, not code.

5. Why return a complete map of to-be-committed offsets instead of a map of
just the offsets that the connector wants to change? This seems especially
intuitive since we automatically re-insert source partitions that have been
removed by the connector.

6. I don't think we don't need to return an Optional from
SourceTask::updateOffsets. Developers can return null instead of
Optional.empty(), and since the framework will have to handle null return
values either way, this would reduce the number of cases for us to handle
from three (Optional.of(...), Optional.empty(), null) to two (null,
non-null).

7. Why disallow tombstone records? If an upstream resource disappears, then
wouldn't a task want to emit a tombstone record without having to also emit
an accompanying source record? This could help prevent an
infinitely-growing offsets topic, although with KIP-875 coming out in the
next release, perhaps we can leave this out for now and let Connect users
and cluster administrators do this work manually instead of letting
connector developers automate it.

8. Is the information on multiple offsets topics for exactly-once
connectors relevant to this KIP? If not, we should remove it.

9. It seems like most of the use cases that motivate this KIP only require
being able to add a new source partition/source offset pair to the
to-be-committed offsets. Do we need to allow connector developers to modify
source offsets for already-present source partitions at all? If we reduce
the surface of the API, then the worst case is still just that the offsets
we commit are at most one commit out-of-date.

10. (Nit) The "Motivation" section states that "offsets are written
periodically by the connect framework to an offsets topic". This is only
true in distributed mode; in standalone mode, we write offsets to a local
file.

Cheers,

Chris

On Tue, Jul 4, 2023 at 8:42 AM Yash Mayya <yash.ma...@gmail.com> wrote:

> Hi Sagar,
>
> Thanks for your continued work on this KIP! Here are my thoughts on your
> updated proposal:
>
> 1) In the proposed changes section where you talk about modifying the
> offsets, could you please clarify that tasks shouldn't modify the offsets
> map that is passed as an argument? Currently, the distinction between the
> offsets map passed as an argument and the offsets map that is returned is
> not very clear in numerous places.
>
> 2) The default return value of Optional.empty() seems to be fairly
> non-intuitive considering that the return value is supposed to be the
> offsets that are to be committed. Can we consider simply returning the
> offsets argument itself by default instead?
>
> 3) The KIP states that "It is also possible that a task might choose to
> send a tombstone record as an offset. This is not recommended and to
> prevent connectors shooting themselves in the foot due to this" - could you
> please clarify why this is not recommended / supported?
>
> 4) The KIP states that "If a task returns an Optional of a null object or
> an Optional of an empty map, even for such cases the behaviour would would
> be disabled." - since this is an optional API that source task
> implementations don't necessarily need to implement, I don't think I fully
> follow why the return type of the proposed "updateOffsets" method is an
> Optional? Can we not simply use the Map as the return type instead?
>
> 5) The KIP states that "The offsets passed to the updateOffsets  method
> would be the offset from the latest source record amongst all source
> records per partition. This way, if the source offset for a given source
> partition is updated, that offset is the one that gets committed for the
> source partition." - we should clarify that the "latest" offset refers to
> the offsets that are about to be committed, and not the latest offsets
> returned from SourceTask::poll so far (see related discussion in
> https://issues.apache.org/jira/browse/KAFKA-15091 and
> https://issues.apache.org/jira/browse/KAFKA-5716).
>
> 6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in
> Connect since the framework itself does not (and cannot) make any
> guarantees on the delivery semantics. Depending on the source connector and
> the source system, both at-least once and at-most once semantics (for
> example - a source system where reads are destructive) are possible. We
> should avoid introducing this terminology in the KIP and instead refer to
> this scenario as exactly-once support being disabled.
>
> 7) Similar to the above point, we should remove the use of the term
> "Exactly Once Semantics" and instead refer to exactly-once support being
> enabled since the framework can't guarantee exactly-once semantics for all
> possible source connectors (for example - a message queue source connector
> where offsets are essentially managed in the source system via an ack
> mechanism).
>
> 8) In a previous attempt to fix this gap in functionality, a significant
> concern was raised on offsets ordering guarantees when we retry sending a
> batch of records (ref -
> https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
> look like this KIP addresses that concern either? In the case where
> exactly-once support is disabled - if we update the committableOffsets with
> the offsets provided by the task through the new updateOffsets method,
> these offsets could be committed before older "regular" offsets are
> committed due to producer retries which could then lead to an inconsistency
> if the send operation eventually succeeds.
>
> 9) The KIP states that when exactly-once support is enabled, the new
> SourceTask::updateOffsets method will be invoked only when an offset flush
> is attempted. If the connector is configured to use a connector specified
> transaction boundary rather than a poll or interval based boundary, isn't
> it possible that we don't call SourceTask::updateOffsets until there are
> actual records that are also being returned through poll (which would
> defeat the primary motivation of the KIP)? Or are we making the assumption
> that the connector defined transaction boundary should handle this case
> appropriately if needed (i.e. source tasks should occasionally request for
> a transaction commit via their transaction context if they want offsets to
> be committed without producing records)? If so, I think we should
> explicitly call that out in the KIP.
>
> 10) The Javadoc for SourceTask::updateOffsets in the section on public
> interfaces also has the same issue with the definition of latest offsets
> that I've mentioned above (latest offsets from poll versus latest offsets
> that are about to be committed).
>
> 11) The Javadoc for SourceTask::updateOffsets also introduces the same
> confusion w.r.t updating offsets that I've mentioned above (modifying the
> offsets map argument versus returning a modified copy of the offsets map).
>
> 12) In the section on compatibility, we should explicitly mention that
> connectors which implement the new method will still be compatible with
> older Connect runtimes where the method will simply not be invoked.
>
>
> Thanks,
> Yash
>
> On Wed, Jun 21, 2023 at 10:25 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>
> > Hi All,
> >
> > I have created this PR: https://github.com/apache/kafka/pull/13899 which
> > implements the approach outlined in the latest version of the KIP. I
> > thought I could use this to validate the approach based on my
> understanding
> > while the KIP itself gets reviewed. I can always change the
> implementation
> > once we move to a final decision on the KIP.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Jun 14, 2023 at 4:59 PM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> > > Hey All,
> > >
> > > Bumping this discussion thread again to see how the modified KIP looks
> > > like.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Mon, May 29, 2023 at 8:12 PM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> > >
> > >> Hi,
> > >>
> > >> Bumping this thread again for further reviews.
> > >>
> > >> Thanks!
> > >> Sagar.
> > >>
> > >> On Fri, May 12, 2023 at 3:38 PM Sagar <sagarmeansoc...@gmail.com>
> > wrote:
> > >>
> > >>> Hi All,
> > >>>
> > >>> Thanks for the comments/reviews. I have updated the KIP
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > >>> with a newer approach which shelves the need for an explicit topic.
> > >>>
> > >>> Please review again and let me know what you think.
> > >>>
> > >>> Thanks!
> > >>> Sagar.
> > >>>
> > >>>
> > >>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <yash.ma...@gmail.com>
> > wrote:
> > >>>
> > >>>> Hi Sagar,
> > >>>>
> > >>>> Thanks for the KIP! I have a few questions and comments:
> > >>>>
> > >>>> 1) I agree with Chris' point about the separation of a connector
> > >>>> heartbeat
> > >>>> mechanism and allowing source connectors to generate offsets without
> > >>>> producing data. What is the purpose of the heartbeat topic here and
> > are
> > >>>> there any concrete use cases for downstream consumers on this topic?
> > Why
> > >>>> can't we instead simply introduce a mechanism to retrieve a list of
> > >>>> source
> > >>>> partition / source offset pairs from the source tasks?
> > >>>>
> > >>>> 2) With the currently described mechanism, the new
> > >>>> "SourceTask::produceHeartbeatRecords" method returns a
> > >>>> "List<SourceRecord>"
> > >>>> - what happens with the topic in each of these source records? Chris
> > >>>> pointed this out above, but it doesn't seem to have been addressed?
> > The
> > >>>> "SourceRecord" class also has a bunch of other fields which will be
> > >>>> irrelevant here (partition, key / value schema, key / value data,
> > >>>> timestamp, headers). In fact, it seems like only the source
> partition
> > >>>> and
> > >>>> source offset are relevant here, so we should either introduce a new
> > >>>> abstraction or simply use a data structure like a mapping from
> source
> > >>>> partitions to source offsets (adds to the above point)?
> > >>>>
> > >>>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
> > >>>> needed? What are the downsides of
> > >>>> calling "SourceTask::produceHeartbeatRecords" in every execution
> loop
> > >>>> (similar to the existing "SourceTask::poll" method)? Is this only to
> > >>>> prevent the generation of a lot of offset records? Since Connect's
> > >>>> offsets
> > >>>> topics are log compacted (and source partitions are used as keys for
> > >>>> each
> > >>>> source offset), I'm not sure if such concerns are valid and such a
> > >>>> heartbeat timer / interval mechanism is required?
> > >>>>
> > >>>> 4) The first couple of rejected alternatives state that the use of a
> > >>>> null
> > >>>> topic / key / value are preferably avoided - but the current
> proposal
> > >>>> would
> > >>>> also likely require connectors to use such workarounds (null topic
> > when
> > >>>> the
> > >>>> heartbeat topic is configured at a worker level and always for the
> > key /
> > >>>> value)?
> > >>>>
> > >>>> 5) The third rejected alternative talks about subclassing the
> > >>>> "SourceRecord" class - this presumably means allowing connectors to
> > pass
> > >>>> special offset only records via the existing poll mechanism? Why was
> > >>>> this
> > >>>> considered a more invasive option? Was it because of the backward
> > >>>> compatibility issues that would be introduced for plugins using the
> > new
> > >>>> public API class that still need to be deployed onto older Connect
> > >>>> workers?
> > >>>>
> > >>>> Thanks,
> > >>>> Yash
> > >>>>
> > >>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <sagarmeansoc...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>> > One thing I forgot to mention in my previous email was that the
> > >>>> reason I
> > >>>> > chose to include the opt-in behaviour via configs was that the
> users
> > >>>> of the
> > >>>> > connector know their workload patterns. If the workload is such
> that
> > >>>> the
> > >>>> >  connector would receive regular valid updates then there’s
> ideally
> > >>>> no need
> > >>>> > for moving offsets since it would update automatically.
> > >>>> >
> > >>>> > This way they aren’t forced to use this feature and can use it
> only
> > >>>> when
> > >>>> > the workload is expected to be batchy or not frequent.
> > >>>> >
> > >>>> > Thanks!
> > >>>> > Sagar.
> > >>>> >
> > >>>> >
> > >>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar <sagarmeansoc...@gmail.com>
> > >>>> wrote:
> > >>>> >
> > >>>> > > Hi Chris,
> > >>>> > >
> > >>>> > > Thanks for following up on the response. Sharing my thoughts
> > >>>> further:
> > >>>> > >
> > >>>> > > If we want to add support for connectors to emit offsets without
> > >>>> > >> accompanying source records, we could (and IMO should) do that
> > >>>> without
> > >>>> > >> requiring users to manually enable that feature by adjusting
> > >>>> worker or
> > >>>> > >> connector configurations.
> > >>>> > >
> > >>>> > >
> > >>>> > > With the current KIP design, I have tried to implement this in
> an
> > >>>> opt-in
> > >>>> > > manner via configs. I guess what you are trying to say is that
> > this
> > >>>> > doesn't
> > >>>> > > need a config of it's own and instead could be part of the poll
> ->
> > >>>> > > transform etc -> produce -> commit cycle. That way, the users
> > don't
> > >>>> need
> > >>>> > to
> > >>>> > > set any config and if the connector supports moving offsets w/o
> > >>>> producing
> > >>>> > > SourceRecords, it should happen automatically. Is that correct?
> If
> > >>>> that
> > >>>> > > is the concern, then I can think of not exposing a config and
> try
> > >>>> to make
> > >>>> > > this process automatically. That should ease the load on
> connector
> > >>>> users,
> > >>>> > > but your point about cognitive load on Connector developers, I
> am
> > >>>> still
> > >>>> > not
> > >>>> > > sure how to address that. The offsets are privy to a connector
> and
> > >>>> the
> > >>>> > > framework at best can provide hooks to the tasks to update their
> > >>>> offsets.
> > >>>> > > Connector developers would still have to consider all cases
> before
> > >>>> > updating
> > >>>> > > offsets.  And if I ignore the heartbeat topic and heartbeat
> > >>>> interval ms
> > >>>> > > configs, then what the KIP proposes currently isn't much
> different
> > >>>> in
> > >>>> > that
> > >>>> > > regard. Just that it produces a List of SourceRecord which can
> be
> > >>>> changed
> > >>>> > > to a Map of SourcePartition and their offsets if you think that
> > >>>> would
> > >>>> > > simplify things. Are there other cases in your mind which need
> > >>>> > addressing?
> > >>>> > >
> > >>>> > > Here's my take on the usecases:
> > >>>> > >
> > >>>> > >    1. Regarding the example about SMTs with Object Storage based
> > >>>> > >    connectors, it was one of the scenarios identified. We have
> > some
> > >>>> > connectors
> > >>>> > >    that rely on the offsets topic to check if the next batch of
> > >>>> files
> > >>>> > should
> > >>>> > >    be processed and because of filtering of the last record from
> > the
> > >>>> > files,
> > >>>> > >    the eof supposedly is  never reached and the connector can't
> > >>>> commit
> > >>>> > offsets
> > >>>> > >    for that source partition(file). If there was a mechanism to
> > >>>> update
> > >>>> > offsets
> > >>>> > >    for such a source file, then with some moderately complex
> state
> > >>>> > tracking,
> > >>>> > >    the connector can mark that file as processed and proceed.
> > >>>> > >    2. There's another use case with the same class of connectors
> > >>>> where if
> > >>>> > >    a file is malformed, then the connector couldn't produce any
> > >>>> offsets
> > >>>> > >    because the file couldn't get processed completely. To handle
> > >>>> such
> > >>>> > cases,
> > >>>> > >    the connector developers have introduced a dev/null sort of
> > topic
> > >>>> > where
> > >>>> > >    they produce a record to this corrupted file topic and move
> the
> > >>>> offset
> > >>>> > >    somehow. This topic ideally isn't needed and with a mechanism
> > to
> > >>>> > update
> > >>>> > >    offsets would have helped in this case as well.
> > >>>> > >    3. Coming to CDC based connectors,
> > >>>> > >       1. We had a similar issue with Oracle CDC source connector
> > and
> > >>>> > >       needed to employ the same heartbeat mechanism to get
> around
> > >>>> it.
> > >>>> > >       2. MongoDB CDC source Connector  has employed the same
> > >>>> heartbeat
> > >>>> > >       mechanism Check `heartbeat.interval.ms` here (
> > >>>> > >
> > >>>> >
> > >>>>
> >
> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
> > >>>> > >       ).
> > >>>> > >       3. Another CDC connector for ScyllaDB employs a similar
> > >>>> mechanism.
> > >>>> > >
> > >>>> >
> > >>>>
> >
> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
> > >>>> > >       4. For CDC based connectors, you could argue that these
> > >>>> connectors
> > >>>> > >       have been able to solve this error then why do we need
> > >>>> framework
> > >>>> > level
> > >>>> > >       support. But the point I am trying to make is that this
> > >>>> limitation
> > >>>> > from the
> > >>>> > >       framework is forcing CDC connector developers to implement
> > >>>> > per-connector
> > >>>> > >       solutions/hacks(at times). And there could always be more
> > CDC
> > >>>> > connectors in
> > >>>> > >       the pipeline forcing them to take a similar route as well.
> > >>>> > >    4. There's also a case at times with CDC source connectors
> > which
> > >>>> are
> > >>>> > >    REST Api / Web Service based(Zendesk Source Connector for
> > >>>> example) .
> > >>>> > These
> > >>>> > >    connectors typically use timestamps from the responses as
> > >>>> offsets. If
> > >>>> > >    there's a long period of inactivity wherein the API
> invocations
> > >>>> don't
> > >>>> > >    return any data, then the offsets won't move and the
> connector
> > >>>> would
> > >>>> > keep
> > >>>> > >    using the same timestamp that it received from the last
> > non-empty
> > >>>> > response.
> > >>>> > >    If this period of inactivity keeps growing, and the API
> imposes
> > >>>> any
> > >>>> > limits
> > >>>> > >    on how far back we can go in terms of window start, then this
> > >>>> could
> > >>>> > >    potentially be a problem. In this case even though the
> > connector
> > >>>> was
> > >>>> > caught
> > >>>> > >    up with all the responses, it may need to snapshot again. In
> > >>>> this case
> > >>>> > >    updating offsets can easily help since all the connector
> needs
> > >>>> to do
> > >>>> > is to
> > >>>> > >    move the timestamp which would move the offset inherently.
> > >>>> > >
> > >>>> > > I still believe that this is something the framework should
> > support
> > >>>> OOB
> > >>>> > > irrespective of whether the connectors have been able to get
> > around
> > >>>> this
> > >>>> > > restriction or not.
> > >>>> > >
> > >>>> > > Lastly, about your comments here:
> > >>>> > >
> > >>>> > > I'm also not sure that it's worth preserving the current
> behavior
> > >>>> that
> > >>>> > >> offsets for records that have been filtered out via SMT are not
> > >>>> > committed.
> > >>>> > >
> > >>>> > >
> > >>>> > > Let me know if we need a separate JIRA to track this? This
> somehow
> > >>>> didn't
> > >>>> > > look related to this discussion.
> > >>>> > >
> > >>>> > > Thanks!
> > >>>> > > Sagar.
> > >>>> > >
> > >>>> > >
> > >>>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton
> > >>>> <chr...@aiven.io.invalid>
> > >>>> > > wrote:
> > >>>> > >
> > >>>> > >> Hi Sagar,
> > >>>> > >>
> > >>>> > >> I'm sorry, I'm still not convinced that this design solves the
> > >>>> > problem(s)
> > >>>> > >> it sets out to solve in the best way possible. I tried to
> > >>>> highlight this
> > >>>> > >> in
> > >>>> > >> my last email:
> > >>>> > >>
> > >>>> > >> > In general, it seems like we're trying to solve two
> completely
> > >>>> > different
> > >>>> > >> problems with this single KIP: adding framework-level support
> for
> > >>>> > emitting
> > >>>> > >> heartbeat records for source connectors, and allowing source
> > >>>> connectors
> > >>>> > to
> > >>>> > >> emit offsets without also emitting source records. I don't mind
> > >>>> > addressing
> > >>>> > >> the two at the same time if the result is elegant and doesn't
> > >>>> compromise
> > >>>> > >> on
> > >>>> > >> the solution for either problem, but that doesn't seem to be
> the
> > >>>> case
> > >>>> > >> here.
> > >>>> > >> Of the two problems, could we describe one as the primary and
> one
> > >>>> as the
> > >>>> > >> secondary? If so, we might consider dropping the secondary
> > problem
> > >>>> from
> > >>>> > >> this KIP and addressing it separately.
> > >>>> > >>
> > >>>> > >> If we wanted to add support for heartbeat records, we could
> (and
> > >>>> IMO
> > >>>> > >> should) do that without requiring connectors to implement any
> new
> > >>>> > methods
> > >>>> > >> and only require adjustments to worker or connector
> > configurations
> > >>>> by
> > >>>> > >> users
> > >>>> > >> in order to enable that feature.
> > >>>> > >>
> > >>>> > >> If we want to add support for connectors to emit offsets
> without
> > >>>> > >> accompanying source records, we could (and IMO should) do that
> > >>>> without
> > >>>> > >> requiring users to manually enable that feature by adjusting
> > >>>> worker or
> > >>>> > >> connector configurations.
> > >>>> > >>
> > >>>> > >>
> > >>>> > >> I'm also not sure that it's worth preserving the current
> behavior
> > >>>> that
> > >>>> > >> offsets for records that have been filtered out via SMT are not
> > >>>> > committed.
> > >>>> > >> I can't think of a case where this would be useful and there
> are
> > >>>> > obviously
> > >>>> > >> plenty where it isn't. There's also a slight discrepancy in how
> > >>>> these
> > >>>> > >> kinds
> > >>>> > >> of records are treated by the Connect runtime now; if a record
> is
> > >>>> > dropped
> > >>>> > >> because of an SMT, then its offset isn't committed, but if it's
> > >>>> dropped
> > >>>> > >> because exactly-once support is enabled and the connector chose
> > to
> > >>>> abort
> > >>>> > >> the batch containing the record, then its offset is still
> > >>>> committed.
> > >>>> > After
> > >>>> > >> thinking carefully about the aborted transaction behavior, we
> > >>>> realized
> > >>>> > >> that
> > >>>> > >> it was fine to commit the offsets for those records, and I
> > believe
> > >>>> that
> > >>>> > >> the
> > >>>> > >> same logic can be applied to any record that we're done trying
> to
> > >>>> send
> > >>>> > to
> > >>>> > >> Kafka (regardless of whether it was sent correctly, dropped due
> > to
> > >>>> > >> producer
> > >>>> > >> error, filtered via SMT, etc.).
> > >>>> > >>
> > >>>> > >> I also find the file-based source connector example a little
> > >>>> confusing.
> > >>>> > >> What about that kind of connector causes the offset for the
> last
> > >>>> record
> > >>>> > of
> > >>>> > >> a file to be treated differently? Is there anything different
> > about
> > >>>> > >> filtering that record via SMT vs. dropping it altogether
> because
> > >>>> of an
> > >>>> > >> asynchronous producer error with "errors.tolerance" set to
> "all"?
> > >>>> And
> > >>>> > >> finally, how would such a connector use the design proposed
> here?
> > >>>> > >>
> > >>>> > >> Finally, I don't disagree that if there are other legitimate
> use
> > >>>> cases
> > >>>> > >> that
> > >>>> > >> would be helped by addressing KAFKA-3821, we should try to
> solve
> > >>>> that
> > >>>> > >> issue
> > >>>> > >> in the Kafka Connect framework instead of requiring individual
> > >>>> > connectors
> > >>>> > >> to implement their own solutions. But the cognitive load added
> by
> > >>>> the
> > >>>> > >> design proposed here, for connector developers and Connect
> > cluster
> > >>>> > >> administrators alike, costs too much to justify by pointing to
> an
> > >>>> > >> already-solved problem encountered by a single group of
> > connectors
> > >>>> > (i.e.,
> > >>>> > >> Debezium). This is why I think it's crucial that we identify
> > >>>> realistic
> > >>>> > >> cases where this feature would actually be useful, and right
> > now, I
> > >>>> > don't
> > >>>> > >> think any have been provided (at least, not ones that have
> > already
> > >>>> been
> > >>>> > >> addressed or could be addressed with much simpler changes).
> > >>>> > >>
> > >>>> > >> Cheers,
> > >>>> > >>
> > >>>> > >> Chris
> > >>>> > >>
> > >>>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar <
> sagarmeansoc...@gmail.com
> > >
> > >>>> > wrote:
> > >>>> > >>
> > >>>> > >> > Hi Chris,
> > >>>> > >> >
> > >>>> > >> > Thanks for your detailed feedback!
> > >>>> > >> >
> > >>>> > >> > nits: I have taken care of them now. Thanks for pointing
> those
> > >>>> out.
> > >>>> > >> >
> > >>>> > >> > non-nits:
> > >>>> > >> >
> > >>>> > >> > 6) It seems (based on both the KIP and discussion on
> > KAFKA-3821)
> > >>>> that
> > >>>> > >> the
> > >>>> > >> > > only use case for being able to emit offsets without also
> > >>>> emitting
> > >>>> > >> source
> > >>>> > >> > > records that's been identified so far is for CDC source
> > >>>> connectors
> > >>>> > >> like
> > >>>> > >> > > Debezium.
> > >>>> > >> >
> > >>>> > >> >
> > >>>> > >> > I am aware of atleast one more case where the non production
> of
> > >>>> > offsets
> > >>>> > >> > (due to non production of records ) leads to the failure of
> > >>>> connectors
> > >>>> > >> when
> > >>>> > >> > the source purges the records of interest. This happens in
> File
> > >>>> based
> > >>>> > >> > source connectors  (like s3/blob storage ) in which if the
> last
> > >>>> record
> > >>>> > >> from
> > >>>> > >> > a file is fiterterd due to an SMT, then that particular file
> is
> > >>>> never
> > >>>> > >> > committed to the source partition and eventually when the
> file
> > is
> > >>>> > >> deleted
> > >>>> > >> > from the source and the connector is restarted due to some
> > >>>> reason, it
> > >>>> > >> > fails.
> > >>>> > >> > Moreover, I feel the reason this support should be there in
> the
> > >>>> Kafka
> > >>>> > >> > Connect framework is because this is a restriction of the
> > >>>> framework
> > >>>> > and
> > >>>> > >> > today the framework provides no support for getting around
> this
> > >>>> > >> limitation.
> > >>>> > >> > Every connector has it's own way of handling offsets and
> having
> > >>>> each
> > >>>> > >> > connector handle this restriction in its own way can make it
> > >>>> complex.
> > >>>> > >> > Whether we choose to do it the way this KIP prescribes or any
> > >>>> other
> > >>>> > way
> > >>>> > >> is
> > >>>> > >> > up for debate but IMHO, the framework should provide a way of
> > >>>> > >> > getting around this limitation.
> > >>>> > >> >
> > >>>> > >> > 7. If a task produces heartbeat records and source records
> that
> > >>>> use
> > >>>> > the
> > >>>> > >> > > same source partition, which offset will ultimately be
> > >>>> committed?
> > >>>> > >> >
> > >>>> > >> >
> > >>>> > >> > The idea is to add the records returned by the
> > >>>> > `produceHeartbeatRecords`
> > >>>> > >> > to  the same `toSend` list within
> > >>>> `AbstractWorkerSourceTask#execute`.
> > >>>> > >> The
> > >>>> > >> > `produceHeartbeatRecords` would be invoked before we make the
> > >>>> `poll`
> > >>>> > >> call.
> > >>>> > >> > Hence, the offsets committed would be in the same order in
> > which
> > >>>> they
> > >>>> > >> would
> > >>>> > >> > be written. Note that, the onus is on the Connector
> > >>>> implementation to
> > >>>> > >> not
> > >>>> > >> > return records which can lead to data loss or data going out
> of
> > >>>> order.
> > >>>> > >> The
> > >>>> > >> > framework would just commit based on whatever is supplied.
> > Also,
> > >>>> > AFAIK,
> > >>>> > >> 2
> > >>>> > >> > `normal` source records can also produce the same source
> > >>>> partitions
> > >>>> > and
> > >>>> > >> > they are committed in the order in which they are written.
> > >>>> > >> >
> > >>>> > >> > 8. The SourceTask::produceHeartbeatRecords method returns a
> > >>>> > >> > > List<SourceRecord>, and users can control the heartbeat
> topic
> > >>>> for a
> > >>>> > >> > > connector via the (connector- or worker-level)
> > >>>> > >> "heartbeat.records.topic"
> > >>>> > >> > > property. Since every constructor for the SourceRecord
> class
> > >>>> [2]
> > >>>> > >> > requires a
> > >>>> > >> > > topic to be supplied, what will happen to that topic? Will
> it
> > >>>> be
> > >>>> > >> ignored?
> > >>>> > >> > > If so, I think we should look for a cleaner solution.
> > >>>> > >> >
> > >>>> > >> >
> > >>>> > >> > Sorry, I couldn't quite follow which topic will be ignored in
> > >>>> this
> > >>>> > case.
> > >>>> > >> >
> > >>>> > >> > 9. A large concern raised in the discussion for KAFKA-3821
> was
> > >>>> the
> > >>>> > >> allowing
> > >>>> > >> > > connectors to control the ordering of these special
> > >>>> "offsets-only"
> > >>>> > >> > > emissions and the regular source records returned from
> > >>>> > >> SourceTask::poll.
> > >>>> > >> > > Are we choosing to ignore that concern? If so, can you add
> > >>>> this to
> > >>>> > the
> > >>>> > >> > > rejected alternatives section along with a rationale?
> > >>>> > >> >
> > >>>> > >> >
> > >>>> > >> > One thing to note is that the for every connector, the
> > condition
> > >>>> to
> > >>>> > emit
> > >>>> > >> > the heartbeat record is totally up to the connector, For
> > >>>> example, for
> > >>>> > a
> > >>>> > >> > connector which is tracking transactions for an ordered log,
> if
> > >>>> there
> > >>>> > >> are
> > >>>> > >> > open transactions, it might not need to emit heartbeat
> records
> > >>>> when
> > >>>> > the
> > >>>> > >> > timer expires while for file based connectors, if the same
> file
> > >>>> is
> > >>>> > being
> > >>>> > >> > processed again and again due to an SMT or some other
> reasons,
> > >>>> then it
> > >>>> > >> can
> > >>>> > >> > choose to emit that partition. The uber point here is that
> > every
> > >>>> > >> connector
> > >>>> > >> > has it's own requirements and the framework can't really make
> > an
> > >>>> > >> assumption
> > >>>> > >> > about it. What the KIP is trying to do is to provide a
> > mechanism
> > >>>> to
> > >>>> > the
> > >>>> > >> > connector to commit new offsets. With this approach, as far
> as
> > I
> > >>>> can
> > >>>> > >> think
> > >>>> > >> > so far, there doesn't seem to be a case of out of order
> > >>>> processing. If
> > >>>> > >> you
> > >>>> > >> > have other concerns/thoughts I would be happy to know them.
> > >>>> > >> >
> > >>>> > >> > 10. If, sometime in the future, we wanted to add
> > framework-level
> > >>>> > support
> > >>>> > >> > > for sending heartbeat records that doesn't require
> connectors
> > >>>> to
> > >>>> > >> > implement
> > >>>> > >> > > any new APIs...
> > >>>> > >> >
> > >>>> > >> >
> > >>>> > >> > The main purpose of producing heartbeat records is to be able
> > to
> > >>>> emit
> > >>>> > >> > offsets w/o any new records. We are using heartbeat records
> to
> > >>>> solve
> > >>>> > the
> > >>>> > >> > primary concern of offsets getting stalled. The reason to do
> > >>>> that was
> > >>>> > >> once
> > >>>> > >> > we get SourceRecords, then the rest of the code is already in
> > >>>> place to
> > >>>> > >> > write it to a topic of interest and commit offsets and that
> > >>>> seemed the
> > >>>> > >> most
> > >>>> > >> > non invasive in terms of framework level changes. If in the
> > >>>> future we
> > >>>> > >> want
> > >>>> > >> > to do a framework-only heartbeat record support, then this
> > would
> > >>>> > create
> > >>>> > >> > confusion as you pointed out. Do you think the choice of the
> > name
> > >>>> > >> heartbeat
> > >>>> > >> > records is creating confusion in this case? Maybe we can call
> > >>>> these
> > >>>> > >> special
> > >>>> > >> > records something else (not sure what at this point) which
> > would
> > >>>> then
> > >>>> > >> > decouple the 2 logically and implementation wise as well?
> > >>>> > >> >
> > >>>> > >> > Thanks!
> > >>>> > >> > Sagar.
> > >>>> > >> >
> > >>>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton
> > >>>> <chr...@aiven.io.invalid
> > >>>> > >
> > >>>> > >> > wrote:
> > >>>> > >> >
> > >>>> > >> > > Hi Sagar,
> > >>>> > >> > >
> > >>>> > >> > > Thanks for the KIP! I have some thoughts.
> > >>>> > >> > >
> > >>>> > >> > > Nits:
> > >>>> > >> > >
> > >>>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on
> > >>>> the KIP?
> > >>>> > >> Or
> > >>>> > >> > is
> > >>>> > >> > > there a different ticket that should be associated with it?
> > >>>> > >> > > 2. The current state is listed as "Draft". Considering it's
> > >>>> been
> > >>>> > >> brought
> > >>>> > >> > up
> > >>>> > >> > > for discussion, maybe the KIP should be updated to
> > >>>> "Discussion"?
> > >>>> > >> > > 3. Can you add a link for the discussion thread to the KIP?
> > >>>> > >> > > 4. The KIP states that "In this process, offsets are
> written
> > at
> > >>>> > >> regular
> > >>>> > >> > > intervals(driven by `offset.flush.interval.ms`)". This
> isn't
> > >>>> > strictly
> > >>>> > >> > > accurate since, when exactly-once support is enabled,
> offset
> > >>>> commits
> > >>>> > >> can
> > >>>> > >> > > also be performed for each record batch (which is the
> > default)
> > >>>> or
> > >>>> > when
> > >>>> > >> > > explicitly requested by the task instance (if the connector
> > >>>> > implements
> > >>>> > >> > the
> > >>>> > >> > > API to define its own transactions and the user has
> > configured
> > >>>> it to
> > >>>> > >> do
> > >>>> > >> > > so). Maybe better to just say "Offsets are written
> > >>>> periodically"?
> > >>>> > >> > > 5. The description for the (per-connector)
> > >>>> "heartbeat.records.topic
> > >>>> > "
> > >>>> > >> > > property states that it is "Only applicable in distributed
> > >>>> mode; in
> > >>>> > >> > > standalone mode, setting this property will have no
> effect".
> > >>>> Is this
> > >>>> > >> > > correct?
> > >>>> > >> > >
> > >>>> > >> > > Non-nits:
> > >>>> > >> > >
> > >>>> > >> > > 6. It seems (based on both the KIP and discussion on
> > >>>> KAFKA-3821)
> > >>>> > that
> > >>>> > >> the
> > >>>> > >> > > only use case for being able to emit offsets without also
> > >>>> emitting
> > >>>> > >> source
> > >>>> > >> > > records that's been identified so far is for CDC source
> > >>>> connectors
> > >>>> > >> like
> > >>>> > >> > > Debezium. But Debezium already has support for this exact
> > >>>> feature
> > >>>> > >> > (emitting
> > >>>> > >> > > heartbeat records that include offsets that cannot be
> > >>>> associated
> > >>>> > with
> > >>>> > >> > > other, "regular" source records). Why should we add this
> > >>>> feature to
> > >>>> > >> Kafka
> > >>>> > >> > > Connect when the problem it addresses is already solved in
> > the
> > >>>> set
> > >>>> > >> > > connectors that (it seems) would have any need for it, and
> > the
> > >>>> size
> > >>>> > of
> > >>>> > >> > that
> > >>>> > >> > > set is extremely small? If there are other practical use
> > cases
> > >>>> for
> > >>>> > >> > > connectors that would benefit from this feature, please let
> > me
> > >>>> know.
> > >>>> > >> > >
> > >>>> > >> > > 7. If a task produces heartbeat records and source records
> > >>>> that use
> > >>>> > >> the
> > >>>> > >> > > same source partition, which offset will ultimately be
> > >>>> committed?
> > >>>> > >> > >
> > >>>> > >> > > 8. The SourceTask::produceHeartbeatRecords method returns a
> > >>>> > >> > > List<SourceRecord>, and users can control the heartbeat
> topic
> > >>>> for a
> > >>>> > >> > > connector via the (connector- or worker-level)
> > >>>> > >> "heartbeat.records.topic"
> > >>>> > >> > > property. Since every constructor for the SourceRecord
> class
> > >>>> [2]
> > >>>> > >> > requires a
> > >>>> > >> > > topic to be supplied, what will happen to that topic? Will
> it
> > >>>> be
> > >>>> > >> ignored?
> > >>>> > >> > > If so, I think we should look for a cleaner solution.
> > >>>> > >> > >
> > >>>> > >> > > 9. A large concern raised in the discussion for KAFKA-3821
> > was
> > >>>> the
> > >>>> > >> > allowing
> > >>>> > >> > > connectors to control the ordering of these special
> > >>>> "offsets-only"
> > >>>> > >> > > emissions and the regular source records returned from
> > >>>> > >> SourceTask::poll.
> > >>>> > >> > > Are we choosing to ignore that concern? If so, can you add
> > >>>> this to
> > >>>> > the
> > >>>> > >> > > rejected alternatives section along with a rationale?
> > >>>> > >> > >
> > >>>> > >> > > 10. If, sometime in the future, we wanted to add
> > >>>> framework-level
> > >>>> > >> support
> > >>>> > >> > > for sending heartbeat records that doesn't require
> connectors
> > >>>> to
> > >>>> > >> > implement
> > >>>> > >> > > any new APIs (e.g., SourceTask::produceHeartbeatRecords), a
> > >>>> lot of
> > >>>> > >> this
> > >>>> > >> > > would paint us into a corner design-wise. We'd have to
> think
> > >>>> > carefully
> > >>>> > >> > > about which property names would be used, how to account
> for
> > >>>> > >> connectors
> > >>>> > >> > > that have already implemented the
> > >>>> > SourceTask::produceHeartbeatRecords
> > >>>> > >> > > method, etc. In general, it seems like we're trying to
> solve
> > >>>> two
> > >>>> > >> > completely
> > >>>> > >> > > different problems with this single KIP: adding
> > framework-level
> > >>>> > >> support
> > >>>> > >> > for
> > >>>> > >> > > emitting heartbeat records for source connectors, and
> > allowing
> > >>>> > source
> > >>>> > >> > > connectors to emit offsets without also emitting source
> > >>>> records. I
> > >>>> > >> don't
> > >>>> > >> > > mind addressing the two at the same time if the result is
> > >>>> elegant
> > >>>> > and
> > >>>> > >> > > doesn't compromise on the solution for either problem, but
> > that
> > >>>> > >> doesn't
> > >>>> > >> > > seem to be the case here. Of the two problems, could we
> > >>>> describe one
> > >>>> > >> as
> > >>>> > >> > the
> > >>>> > >> > > primary and one as the secondary? If so, we might consider
> > >>>> dropping
> > >>>> > >> the
> > >>>> > >> > > secondary problm from this KIP and addressing it
> separately.
> > >>>> > >> > >
> > >>>> > >> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821
> > >>>> > >> > > [2] -
> > >>>> > >> > >
> > >>>> > >> > >
> > >>>> > >> >
> > >>>> > >>
> > >>>> >
> > >>>>
> >
> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html
> > >>>> > >> > >
> > >>>> > >> > > Cheers,
> > >>>> > >> > >
> > >>>> > >> > > Chris
> > >>>> > >> > >
> > >>>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar <
> > >>>> sagarmeansoc...@gmail.com>
> > >>>> > >> > wrote:
> > >>>> > >> > >
> > >>>> > >> > > > Hi John,
> > >>>> > >> > > >
> > >>>> > >> > > > Thanks for taking. look at the KIP!
> > >>>> > >> > > >
> > >>>> > >> > > > The point about stream time not advancing in case of
> > >>>> infrequent
> > >>>> > >> updates
> > >>>> > >> > > is
> > >>>> > >> > > > an interesting one. I can imagine if the upstream
> producer
> > >>>> to a
> > >>>> > >> Kafka
> > >>>> > >> > > > Streams application is a Source Connector which isn't
> > sending
> > >>>> > >> records
> > >>>> > >> > > > frequently(due to the nature of the data ingestion for
> > >>>> example),
> > >>>> > >> then
> > >>>> > >> > the
> > >>>> > >> > > > downstream stream processing can land into the issues you
> > >>>> > described
> > >>>> > >> > > above.
> > >>>> > >> > > >
> > >>>> > >> > > > Which also brings me to the second point you made about
> how
> > >>>> this
> > >>>> > >> would
> > >>>> > >> > be
> > >>>> > >> > > > used by downstream consumers. IIUC, you are referring to
> > the
> > >>>> > >> consumers
> > >>>> > >> > of
> > >>>> > >> > > > the newly added topic i.e the heartbeat topic. In my
> mind,
> > >>>> the
> > >>>> > >> > heartbeat
> > >>>> > >> > > > topic is an internal topic (similar to
> > offsets/config/status
> > >>>> topic
> > >>>> > >> in
> > >>>> > >> > > > connect), the main purpose of which is to trick the
> > >>>> framework to
> > >>>> > >> > produce
> > >>>> > >> > > > records to the offsets topic and advance the offsets.
> Since
> > >>>> every
> > >>>> > >> > > connector
> > >>>> > >> > > > could have a different definition of offsets(LSN,
> BinLogID
> > >>>> etc for
> > >>>> > >> > > > example), that logic to determine what the heartbeat
> > records
> > >>>> > should
> > >>>> > >> be
> > >>>> > >> > > > would have to reside in the actual connector.
> > >>>> > >> > > >
> > >>>> > >> > > > Now that I think of it, it could very well be consumed by
> > >>>> > downstream
> > >>>> > >> > > > consumers/ Streams or Flink Applications and be further
> > used
> > >>>> for
> > >>>> > >> some
> > >>>> > >> > > > decision making. A very crude example could be let's say
> if
> > >>>> the
> > >>>> > >> > heartbeat
> > >>>> > >> > > > records sent to the new heartbeat topic include
> timestamps,
> > >>>> then
> > >>>> > the
> > >>>> > >> > > > downstream streams application can use that timestamp to
> > >>>> close any
> > >>>> > >> time
> > >>>> > >> > > > windows. Having said that, it still appears to me that
> it's
> > >>>> > outside
> > >>>> > >> the
> > >>>> > >> > > > scope of the Connect framework and is something which is
> > >>>> difficult
> > >>>> > >> to
> > >>>> > >> > > > generalise because of the variety of Sources and the
> > >>>> definitions
> > >>>> > of
> > >>>> > >> > > > offsets.
> > >>>> > >> > > >
> > >>>> > >> > > > But, I would still be more than happy to add this example
> > if
> > >>>> you
> > >>>> > >> think
> > >>>> > >> > it
> > >>>> > >> > > > can be useful in getting a better understanding of the
> idea
> > >>>> and
> > >>>> > also
> > >>>> > >> > its
> > >>>> > >> > > > utility beyond connect. Please let me know!
> > >>>> > >> > > >
> > >>>> > >> > > > Thanks!
> > >>>> > >> > > > Sagar.
> > >>>> > >> > > >
> > >>>> > >> > > >
> > >>>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler <
> > >>>> vvcep...@apache.org
> > >>>> > >
> > >>>> > >> > > wrote:
> > >>>> > >> > > >
> > >>>> > >> > > > > Thanks for the KIP, Sagar!
> > >>>> > >> > > > >
> > >>>> > >> > > > > At first glance, this seems like a very useful feature.
> > >>>> > >> > > > >
> > >>>> > >> > > > > A common pain point in Streams is when upstream
> producers
> > >>>> don't
> > >>>> > >> send
> > >>>> > >> > > > > regular updates and stream time cannot advance. This
> > causes
> > >>>> > >> > > > > stream-time-driven operations to appear to hang, like
> > time
> > >>>> > windows
> > >>>> > >> > not
> > >>>> > >> > > > > closing, suppressions not firing, etc.
> > >>>> > >> > > > >
> > >>>> > >> > > > > From your KIP, I have a good idea of how the feature
> > would
> > >>>> be
> > >>>> > >> > > integrated
> > >>>> > >> > > > > into connect, and it sounds good to me. I don't quite
> see
> > >>>> how
> > >>>> > >> > > downstream
> > >>>> > >> > > > > clients, such as a downstream Streams or Flink
> > >>>> application, or
> > >>>> > >> users
> > >>>> > >> > of
> > >>>> > >> > > > the
> > >>>> > >> > > > > Consumer would make use of this feature. Could you add
> > some
> > >>>> > >> examples
> > >>>> > >> > of
> > >>>> > >> > > > > that nature?
> > >>>> > >> > > > >
> > >>>> > >> > > > > Thank you,
> > >>>> > >> > > > > -John
> > >>>> > >> > > > >
> > >>>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> > >>>> > >> > > > > > Hi All,
> > >>>> > >> > > > > >
> > >>>> > >> > > > > > Bumping the thread again.
> > >>>> > >> > > > > >
> > >>>> > >> > > > > > Sagar.
> > >>>> > >> > > > > >
> > >>>> > >> > > > > >
> > >>>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <
> > >>>> > >> sagarmeansoc...@gmail.com>
> > >>>> > >> > > > wrote:
> > >>>> > >> > > > > >
> > >>>> > >> > > > > >> Hi All,
> > >>>> > >> > > > > >>
> > >>>> > >> > > > > >> Bumping this discussion thread again.
> > >>>> > >> > > > > >>
> > >>>> > >> > > > > >> Thanks!
> > >>>> > >> > > > > >> Sagar.
> > >>>> > >> > > > > >>
> > >>>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <
> > >>>> > >> sagarmeansoc...@gmail.com>
> > >>>> > >> > > > wrote:
> > >>>> > >> > > > > >>
> > >>>> > >> > > > > >>> Hi All,
> > >>>> > >> > > > > >>>
> > >>>> > >> > > > > >>> I wanted to create a discussion thread for KIP-910:
> > >>>> > >> > > > > >>>
> > >>>> > >> > > > > >>>
> > >>>> > >> > > > > >>>
> > >>>> > >> > > > >
> > >>>> > >> > > >
> > >>>> > >> > >
> > >>>> > >> >
> > >>>> > >>
> > >>>> >
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > >>>> > >> > > > > >>>
> > >>>> > >> > > > > >>> Thanks!
> > >>>> > >> > > > > >>> Sagar.
> > >>>> > >> > > > > >>>
> > >>>> > >> > > > > >>
> > >>>> > >> > > > >
> > >>>> > >> > > >
> > >>>> > >> > >
> > >>>> > >> >
> > >>>> > >>
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>
> >
>

Reply via email to