Hi Sagar,

Thanks for your continued work on this KIP! Here are my thoughts on your
updated proposal:

1) In the proposed changes section where you talk about modifying the
offsets, could you please clarify that tasks shouldn't modify the offsets
map that is passed as an argument? Currently, the distinction between the
offsets map passed as an argument and the offsets map that is returned is
not very clear in numerous places.

2) The default return value of Optional.empty() seems to be fairly
non-intuitive considering that the return value is supposed to be the
offsets that are to be committed. Can we consider simply returning the
offsets argument itself by default instead?

3) The KIP states that "It is also possible that a task might choose to
send a tombstone record as an offset. This is not recommended and to
prevent connectors shooting themselves in the foot due to this" - could you
please clarify why this is not recommended / supported?

4) The KIP states that "If a task returns an Optional of a null object or
an Optional of an empty map, even for such cases the behaviour would would
be disabled." - since this is an optional API that source task
implementations don't necessarily need to implement, I don't think I fully
follow why the return type of the proposed "updateOffsets" method is an
Optional? Can we not simply use the Map as the return type instead?

5) The KIP states that "The offsets passed to the updateOffsets  method
would be the offset from the latest source record amongst all source
records per partition. This way, if the source offset for a given source
partition is updated, that offset is the one that gets committed for the
source partition." - we should clarify that the "latest" offset refers to
the offsets that are about to be committed, and not the latest offsets
returned from SourceTask::poll so far (see related discussion in
https://issues.apache.org/jira/browse/KAFKA-15091 and
https://issues.apache.org/jira/browse/KAFKA-5716).

6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in
Connect since the framework itself does not (and cannot) make any
guarantees on the delivery semantics. Depending on the source connector and
the source system, both at-least once and at-most once semantics (for
example - a source system where reads are destructive) are possible. We
should avoid introducing this terminology in the KIP and instead refer to
this scenario as exactly-once support being disabled.

7) Similar to the above point, we should remove the use of the term
"Exactly Once Semantics" and instead refer to exactly-once support being
enabled since the framework can't guarantee exactly-once semantics for all
possible source connectors (for example - a message queue source connector
where offsets are essentially managed in the source system via an ack
mechanism).

8) In a previous attempt to fix this gap in functionality, a significant
concern was raised on offsets ordering guarantees when we retry sending a
batch of records (ref -
https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
look like this KIP addresses that concern either? In the case where
exactly-once support is disabled - if we update the committableOffsets with
the offsets provided by the task through the new updateOffsets method,
these offsets could be committed before older "regular" offsets are
committed due to producer retries which could then lead to an inconsistency
if the send operation eventually succeeds.

9) The KIP states that when exactly-once support is enabled, the new
SourceTask::updateOffsets method will be invoked only when an offset flush
is attempted. If the connector is configured to use a connector specified
transaction boundary rather than a poll or interval based boundary, isn't
it possible that we don't call SourceTask::updateOffsets until there are
actual records that are also being returned through poll (which would
defeat the primary motivation of the KIP)? Or are we making the assumption
that the connector defined transaction boundary should handle this case
appropriately if needed (i.e. source tasks should occasionally request for
a transaction commit via their transaction context if they want offsets to
be committed without producing records)? If so, I think we should
explicitly call that out in the KIP.

10) The Javadoc for SourceTask::updateOffsets in the section on public
interfaces also has the same issue with the definition of latest offsets
that I've mentioned above (latest offsets from poll versus latest offsets
that are about to be committed).

11) The Javadoc for SourceTask::updateOffsets also introduces the same
confusion w.r.t updating offsets that I've mentioned above (modifying the
offsets map argument versus returning a modified copy of the offsets map).

12) In the section on compatibility, we should explicitly mention that
connectors which implement the new method will still be compatible with
older Connect runtimes where the method will simply not be invoked.


Thanks,
Yash

On Wed, Jun 21, 2023 at 10:25 PM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi All,
>
> I have created this PR: https://github.com/apache/kafka/pull/13899 which
> implements the approach outlined in the latest version of the KIP. I
> thought I could use this to validate the approach based on my understanding
> while the KIP itself gets reviewed. I can always change the implementation
> once we move to a final decision on the KIP.
>
> Thanks!
> Sagar.
>
>
> On Wed, Jun 14, 2023 at 4:59 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>
> > Hey All,
> >
> > Bumping this discussion thread again to see how the modified KIP looks
> > like.
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, May 29, 2023 at 8:12 PM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> Bumping this thread again for further reviews.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Fri, May 12, 2023 at 3:38 PM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> >>
> >>> Hi All,
> >>>
> >>> Thanks for the comments/reviews. I have updated the KIP
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> >>> with a newer approach which shelves the need for an explicit topic.
> >>>
> >>> Please review again and let me know what you think.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>>
> >>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> Thanks for the KIP! I have a few questions and comments:
> >>>>
> >>>> 1) I agree with Chris' point about the separation of a connector
> >>>> heartbeat
> >>>> mechanism and allowing source connectors to generate offsets without
> >>>> producing data. What is the purpose of the heartbeat topic here and
> are
> >>>> there any concrete use cases for downstream consumers on this topic?
> Why
> >>>> can't we instead simply introduce a mechanism to retrieve a list of
> >>>> source
> >>>> partition / source offset pairs from the source tasks?
> >>>>
> >>>> 2) With the currently described mechanism, the new
> >>>> "SourceTask::produceHeartbeatRecords" method returns a
> >>>> "List<SourceRecord>"
> >>>> - what happens with the topic in each of these source records? Chris
> >>>> pointed this out above, but it doesn't seem to have been addressed?
> The
> >>>> "SourceRecord" class also has a bunch of other fields which will be
> >>>> irrelevant here (partition, key / value schema, key / value data,
> >>>> timestamp, headers). In fact, it seems like only the source partition
> >>>> and
> >>>> source offset are relevant here, so we should either introduce a new
> >>>> abstraction or simply use a data structure like a mapping from source
> >>>> partitions to source offsets (adds to the above point)?
> >>>>
> >>>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
> >>>> needed? What are the downsides of
> >>>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
> >>>> (similar to the existing "SourceTask::poll" method)? Is this only to
> >>>> prevent the generation of a lot of offset records? Since Connect's
> >>>> offsets
> >>>> topics are log compacted (and source partitions are used as keys for
> >>>> each
> >>>> source offset), I'm not sure if such concerns are valid and such a
> >>>> heartbeat timer / interval mechanism is required?
> >>>>
> >>>> 4) The first couple of rejected alternatives state that the use of a
> >>>> null
> >>>> topic / key / value are preferably avoided - but the current proposal
> >>>> would
> >>>> also likely require connectors to use such workarounds (null topic
> when
> >>>> the
> >>>> heartbeat topic is configured at a worker level and always for the
> key /
> >>>> value)?
> >>>>
> >>>> 5) The third rejected alternative talks about subclassing the
> >>>> "SourceRecord" class - this presumably means allowing connectors to
> pass
> >>>> special offset only records via the existing poll mechanism? Why was
> >>>> this
> >>>> considered a more invasive option? Was it because of the backward
> >>>> compatibility issues that would be introduced for plugins using the
> new
> >>>> public API class that still need to be deployed onto older Connect
> >>>> workers?
> >>>>
> >>>> Thanks,
> >>>> Yash
> >>>>
> >>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <sagarmeansoc...@gmail.com>
> >>>> wrote:
> >>>>
> >>>> > One thing I forgot to mention in my previous email was that the
> >>>> reason I
> >>>> > chose to include the opt-in behaviour via configs was that the users
> >>>> of the
> >>>> > connector know their workload patterns. If the workload is such that
> >>>> the
> >>>> >  connector would receive regular valid updates then there’s ideally
> >>>> no need
> >>>> > for moving offsets since it would update automatically.
> >>>> >
> >>>> > This way they aren’t forced to use this feature and can use it only
> >>>> when
> >>>> > the workload is expected to be batchy or not frequent.
> >>>> >
> >>>> > Thanks!
> >>>> > Sagar.
> >>>> >
> >>>> >
> >>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar <sagarmeansoc...@gmail.com>
> >>>> wrote:
> >>>> >
> >>>> > > Hi Chris,
> >>>> > >
> >>>> > > Thanks for following up on the response. Sharing my thoughts
> >>>> further:
> >>>> > >
> >>>> > > If we want to add support for connectors to emit offsets without
> >>>> > >> accompanying source records, we could (and IMO should) do that
> >>>> without
> >>>> > >> requiring users to manually enable that feature by adjusting
> >>>> worker or
> >>>> > >> connector configurations.
> >>>> > >
> >>>> > >
> >>>> > > With the current KIP design, I have tried to implement this in an
> >>>> opt-in
> >>>> > > manner via configs. I guess what you are trying to say is that
> this
> >>>> > doesn't
> >>>> > > need a config of it's own and instead could be part of the poll ->
> >>>> > > transform etc -> produce -> commit cycle. That way, the users
> don't
> >>>> need
> >>>> > to
> >>>> > > set any config and if the connector supports moving offsets w/o
> >>>> producing
> >>>> > > SourceRecords, it should happen automatically. Is that correct? If
> >>>> that
> >>>> > > is the concern, then I can think of not exposing a config and try
> >>>> to make
> >>>> > > this process automatically. That should ease the load on connector
> >>>> users,
> >>>> > > but your point about cognitive load on Connector developers, I am
> >>>> still
> >>>> > not
> >>>> > > sure how to address that. The offsets are privy to a connector and
> >>>> the
> >>>> > > framework at best can provide hooks to the tasks to update their
> >>>> offsets.
> >>>> > > Connector developers would still have to consider all cases before
> >>>> > updating
> >>>> > > offsets.  And if I ignore the heartbeat topic and heartbeat
> >>>> interval ms
> >>>> > > configs, then what the KIP proposes currently isn't much different
> >>>> in
> >>>> > that
> >>>> > > regard. Just that it produces a List of SourceRecord which can be
> >>>> changed
> >>>> > > to a Map of SourcePartition and their offsets if you think that
> >>>> would
> >>>> > > simplify things. Are there other cases in your mind which need
> >>>> > addressing?
> >>>> > >
> >>>> > > Here's my take on the usecases:
> >>>> > >
> >>>> > >    1. Regarding the example about SMTs with Object Storage based
> >>>> > >    connectors, it was one of the scenarios identified. We have
> some
> >>>> > connectors
> >>>> > >    that rely on the offsets topic to check if the next batch of
> >>>> files
> >>>> > should
> >>>> > >    be processed and because of filtering of the last record from
> the
> >>>> > files,
> >>>> > >    the eof supposedly is  never reached and the connector can't
> >>>> commit
> >>>> > offsets
> >>>> > >    for that source partition(file). If there was a mechanism to
> >>>> update
> >>>> > offsets
> >>>> > >    for such a source file, then with some moderately complex state
> >>>> > tracking,
> >>>> > >    the connector can mark that file as processed and proceed.
> >>>> > >    2. There's another use case with the same class of connectors
> >>>> where if
> >>>> > >    a file is malformed, then the connector couldn't produce any
> >>>> offsets
> >>>> > >    because the file couldn't get processed completely. To handle
> >>>> such
> >>>> > cases,
> >>>> > >    the connector developers have introduced a dev/null sort of
> topic
> >>>> > where
> >>>> > >    they produce a record to this corrupted file topic and move the
> >>>> offset
> >>>> > >    somehow. This topic ideally isn't needed and with a mechanism
> to
> >>>> > update
> >>>> > >    offsets would have helped in this case as well.
> >>>> > >    3. Coming to CDC based connectors,
> >>>> > >       1. We had a similar issue with Oracle CDC source connector
> and
> >>>> > >       needed to employ the same heartbeat mechanism to get around
> >>>> it.
> >>>> > >       2. MongoDB CDC source Connector  has employed the same
> >>>> heartbeat
> >>>> > >       mechanism Check `heartbeat.interval.ms` here (
> >>>> > >
> >>>> >
> >>>>
> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
> >>>> > >       ).
> >>>> > >       3. Another CDC connector for ScyllaDB employs a similar
> >>>> mechanism.
> >>>> > >
> >>>> >
> >>>>
> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
> >>>> > >       4. For CDC based connectors, you could argue that these
> >>>> connectors
> >>>> > >       have been able to solve this error then why do we need
> >>>> framework
> >>>> > level
> >>>> > >       support. But the point I am trying to make is that this
> >>>> limitation
> >>>> > from the
> >>>> > >       framework is forcing CDC connector developers to implement
> >>>> > per-connector
> >>>> > >       solutions/hacks(at times). And there could always be more
> CDC
> >>>> > connectors in
> >>>> > >       the pipeline forcing them to take a similar route as well.
> >>>> > >    4. There's also a case at times with CDC source connectors
> which
> >>>> are
> >>>> > >    REST Api / Web Service based(Zendesk Source Connector for
> >>>> example) .
> >>>> > These
> >>>> > >    connectors typically use timestamps from the responses as
> >>>> offsets. If
> >>>> > >    there's a long period of inactivity wherein the API invocations
> >>>> don't
> >>>> > >    return any data, then the offsets won't move and the connector
> >>>> would
> >>>> > keep
> >>>> > >    using the same timestamp that it received from the last
> non-empty
> >>>> > response.
> >>>> > >    If this period of inactivity keeps growing, and the API imposes
> >>>> any
> >>>> > limits
> >>>> > >    on how far back we can go in terms of window start, then this
> >>>> could
> >>>> > >    potentially be a problem. In this case even though the
> connector
> >>>> was
> >>>> > caught
> >>>> > >    up with all the responses, it may need to snapshot again. In
> >>>> this case
> >>>> > >    updating offsets can easily help since all the connector needs
> >>>> to do
> >>>> > is to
> >>>> > >    move the timestamp which would move the offset inherently.
> >>>> > >
> >>>> > > I still believe that this is something the framework should
> support
> >>>> OOB
> >>>> > > irrespective of whether the connectors have been able to get
> around
> >>>> this
> >>>> > > restriction or not.
> >>>> > >
> >>>> > > Lastly, about your comments here:
> >>>> > >
> >>>> > > I'm also not sure that it's worth preserving the current behavior
> >>>> that
> >>>> > >> offsets for records that have been filtered out via SMT are not
> >>>> > committed.
> >>>> > >
> >>>> > >
> >>>> > > Let me know if we need a separate JIRA to track this? This somehow
> >>>> didn't
> >>>> > > look related to this discussion.
> >>>> > >
> >>>> > > Thanks!
> >>>> > > Sagar.
> >>>> > >
> >>>> > >
> >>>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton
> >>>> <chr...@aiven.io.invalid>
> >>>> > > wrote:
> >>>> > >
> >>>> > >> Hi Sagar,
> >>>> > >>
> >>>> > >> I'm sorry, I'm still not convinced that this design solves the
> >>>> > problem(s)
> >>>> > >> it sets out to solve in the best way possible. I tried to
> >>>> highlight this
> >>>> > >> in
> >>>> > >> my last email:
> >>>> > >>
> >>>> > >> > In general, it seems like we're trying to solve two completely
> >>>> > different
> >>>> > >> problems with this single KIP: adding framework-level support for
> >>>> > emitting
> >>>> > >> heartbeat records for source connectors, and allowing source
> >>>> connectors
> >>>> > to
> >>>> > >> emit offsets without also emitting source records. I don't mind
> >>>> > addressing
> >>>> > >> the two at the same time if the result is elegant and doesn't
> >>>> compromise
> >>>> > >> on
> >>>> > >> the solution for either problem, but that doesn't seem to be the
> >>>> case
> >>>> > >> here.
> >>>> > >> Of the two problems, could we describe one as the primary and one
> >>>> as the
> >>>> > >> secondary? If so, we might consider dropping the secondary
> problem
> >>>> from
> >>>> > >> this KIP and addressing it separately.
> >>>> > >>
> >>>> > >> If we wanted to add support for heartbeat records, we could (and
> >>>> IMO
> >>>> > >> should) do that without requiring connectors to implement any new
> >>>> > methods
> >>>> > >> and only require adjustments to worker or connector
> configurations
> >>>> by
> >>>> > >> users
> >>>> > >> in order to enable that feature.
> >>>> > >>
> >>>> > >> If we want to add support for connectors to emit offsets without
> >>>> > >> accompanying source records, we could (and IMO should) do that
> >>>> without
> >>>> > >> requiring users to manually enable that feature by adjusting
> >>>> worker or
> >>>> > >> connector configurations.
> >>>> > >>
> >>>> > >>
> >>>> > >> I'm also not sure that it's worth preserving the current behavior
> >>>> that
> >>>> > >> offsets for records that have been filtered out via SMT are not
> >>>> > committed.
> >>>> > >> I can't think of a case where this would be useful and there are
> >>>> > obviously
> >>>> > >> plenty where it isn't. There's also a slight discrepancy in how
> >>>> these
> >>>> > >> kinds
> >>>> > >> of records are treated by the Connect runtime now; if a record is
> >>>> > dropped
> >>>> > >> because of an SMT, then its offset isn't committed, but if it's
> >>>> dropped
> >>>> > >> because exactly-once support is enabled and the connector chose
> to
> >>>> abort
> >>>> > >> the batch containing the record, then its offset is still
> >>>> committed.
> >>>> > After
> >>>> > >> thinking carefully about the aborted transaction behavior, we
> >>>> realized
> >>>> > >> that
> >>>> > >> it was fine to commit the offsets for those records, and I
> believe
> >>>> that
> >>>> > >> the
> >>>> > >> same logic can be applied to any record that we're done trying to
> >>>> send
> >>>> > to
> >>>> > >> Kafka (regardless of whether it was sent correctly, dropped due
> to
> >>>> > >> producer
> >>>> > >> error, filtered via SMT, etc.).
> >>>> > >>
> >>>> > >> I also find the file-based source connector example a little
> >>>> confusing.
> >>>> > >> What about that kind of connector causes the offset for the last
> >>>> record
> >>>> > of
> >>>> > >> a file to be treated differently? Is there anything different
> about
> >>>> > >> filtering that record via SMT vs. dropping it altogether because
> >>>> of an
> >>>> > >> asynchronous producer error with "errors.tolerance" set to "all"?
> >>>> And
> >>>> > >> finally, how would such a connector use the design proposed here?
> >>>> > >>
> >>>> > >> Finally, I don't disagree that if there are other legitimate use
> >>>> cases
> >>>> > >> that
> >>>> > >> would be helped by addressing KAFKA-3821, we should try to solve
> >>>> that
> >>>> > >> issue
> >>>> > >> in the Kafka Connect framework instead of requiring individual
> >>>> > connectors
> >>>> > >> to implement their own solutions. But the cognitive load added by
> >>>> the
> >>>> > >> design proposed here, for connector developers and Connect
> cluster
> >>>> > >> administrators alike, costs too much to justify by pointing to an
> >>>> > >> already-solved problem encountered by a single group of
> connectors
> >>>> > (i.e.,
> >>>> > >> Debezium). This is why I think it's crucial that we identify
> >>>> realistic
> >>>> > >> cases where this feature would actually be useful, and right
> now, I
> >>>> > don't
> >>>> > >> think any have been provided (at least, not ones that have
> already
> >>>> been
> >>>> > >> addressed or could be addressed with much simpler changes).
> >>>> > >>
> >>>> > >> Cheers,
> >>>> > >>
> >>>> > >> Chris
> >>>> > >>
> >>>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar <sagarmeansoc...@gmail.com
> >
> >>>> > wrote:
> >>>> > >>
> >>>> > >> > Hi Chris,
> >>>> > >> >
> >>>> > >> > Thanks for your detailed feedback!
> >>>> > >> >
> >>>> > >> > nits: I have taken care of them now. Thanks for pointing those
> >>>> out.
> >>>> > >> >
> >>>> > >> > non-nits:
> >>>> > >> >
> >>>> > >> > 6) It seems (based on both the KIP and discussion on
> KAFKA-3821)
> >>>> that
> >>>> > >> the
> >>>> > >> > > only use case for being able to emit offsets without also
> >>>> emitting
> >>>> > >> source
> >>>> > >> > > records that's been identified so far is for CDC source
> >>>> connectors
> >>>> > >> like
> >>>> > >> > > Debezium.
> >>>> > >> >
> >>>> > >> >
> >>>> > >> > I am aware of atleast one more case where the non production of
> >>>> > offsets
> >>>> > >> > (due to non production of records ) leads to the failure of
> >>>> connectors
> >>>> > >> when
> >>>> > >> > the source purges the records of interest. This happens in File
> >>>> based
> >>>> > >> > source connectors  (like s3/blob storage ) in which if the last
> >>>> record
> >>>> > >> from
> >>>> > >> > a file is fiterterd due to an SMT, then that particular file is
> >>>> never
> >>>> > >> > committed to the source partition and eventually when the file
> is
> >>>> > >> deleted
> >>>> > >> > from the source and the connector is restarted due to some
> >>>> reason, it
> >>>> > >> > fails.
> >>>> > >> > Moreover, I feel the reason this support should be there in the
> >>>> Kafka
> >>>> > >> > Connect framework is because this is a restriction of the
> >>>> framework
> >>>> > and
> >>>> > >> > today the framework provides no support for getting around this
> >>>> > >> limitation.
> >>>> > >> > Every connector has it's own way of handling offsets and having
> >>>> each
> >>>> > >> > connector handle this restriction in its own way can make it
> >>>> complex.
> >>>> > >> > Whether we choose to do it the way this KIP prescribes or any
> >>>> other
> >>>> > way
> >>>> > >> is
> >>>> > >> > up for debate but IMHO, the framework should provide a way of
> >>>> > >> > getting around this limitation.
> >>>> > >> >
> >>>> > >> > 7. If a task produces heartbeat records and source records that
> >>>> use
> >>>> > the
> >>>> > >> > > same source partition, which offset will ultimately be
> >>>> committed?
> >>>> > >> >
> >>>> > >> >
> >>>> > >> > The idea is to add the records returned by the
> >>>> > `produceHeartbeatRecords`
> >>>> > >> > to  the same `toSend` list within
> >>>> `AbstractWorkerSourceTask#execute`.
> >>>> > >> The
> >>>> > >> > `produceHeartbeatRecords` would be invoked before we make the
> >>>> `poll`
> >>>> > >> call.
> >>>> > >> > Hence, the offsets committed would be in the same order in
> which
> >>>> they
> >>>> > >> would
> >>>> > >> > be written. Note that, the onus is on the Connector
> >>>> implementation to
> >>>> > >> not
> >>>> > >> > return records which can lead to data loss or data going out of
> >>>> order.
> >>>> > >> The
> >>>> > >> > framework would just commit based on whatever is supplied.
> Also,
> >>>> > AFAIK,
> >>>> > >> 2
> >>>> > >> > `normal` source records can also produce the same source
> >>>> partitions
> >>>> > and
> >>>> > >> > they are committed in the order in which they are written.
> >>>> > >> >
> >>>> > >> > 8. The SourceTask::produceHeartbeatRecords method returns a
> >>>> > >> > > List<SourceRecord>, and users can control the heartbeat topic
> >>>> for a
> >>>> > >> > > connector via the (connector- or worker-level)
> >>>> > >> "heartbeat.records.topic"
> >>>> > >> > > property. Since every constructor for the SourceRecord class
> >>>> [2]
> >>>> > >> > requires a
> >>>> > >> > > topic to be supplied, what will happen to that topic? Will it
> >>>> be
> >>>> > >> ignored?
> >>>> > >> > > If so, I think we should look for a cleaner solution.
> >>>> > >> >
> >>>> > >> >
> >>>> > >> > Sorry, I couldn't quite follow which topic will be ignored in
> >>>> this
> >>>> > case.
> >>>> > >> >
> >>>> > >> > 9. A large concern raised in the discussion for KAFKA-3821 was
> >>>> the
> >>>> > >> allowing
> >>>> > >> > > connectors to control the ordering of these special
> >>>> "offsets-only"
> >>>> > >> > > emissions and the regular source records returned from
> >>>> > >> SourceTask::poll.
> >>>> > >> > > Are we choosing to ignore that concern? If so, can you add
> >>>> this to
> >>>> > the
> >>>> > >> > > rejected alternatives section along with a rationale?
> >>>> > >> >
> >>>> > >> >
> >>>> > >> > One thing to note is that the for every connector, the
> condition
> >>>> to
> >>>> > emit
> >>>> > >> > the heartbeat record is totally up to the connector, For
> >>>> example, for
> >>>> > a
> >>>> > >> > connector which is tracking transactions for an ordered log, if
> >>>> there
> >>>> > >> are
> >>>> > >> > open transactions, it might not need to emit heartbeat records
> >>>> when
> >>>> > the
> >>>> > >> > timer expires while for file based connectors, if the same file
> >>>> is
> >>>> > being
> >>>> > >> > processed again and again due to an SMT or some other reasons,
> >>>> then it
> >>>> > >> can
> >>>> > >> > choose to emit that partition. The uber point here is that
> every
> >>>> > >> connector
> >>>> > >> > has it's own requirements and the framework can't really make
> an
> >>>> > >> assumption
> >>>> > >> > about it. What the KIP is trying to do is to provide a
> mechanism
> >>>> to
> >>>> > the
> >>>> > >> > connector to commit new offsets. With this approach, as far as
> I
> >>>> can
> >>>> > >> think
> >>>> > >> > so far, there doesn't seem to be a case of out of order
> >>>> processing. If
> >>>> > >> you
> >>>> > >> > have other concerns/thoughts I would be happy to know them.
> >>>> > >> >
> >>>> > >> > 10. If, sometime in the future, we wanted to add
> framework-level
> >>>> > support
> >>>> > >> > > for sending heartbeat records that doesn't require connectors
> >>>> to
> >>>> > >> > implement
> >>>> > >> > > any new APIs...
> >>>> > >> >
> >>>> > >> >
> >>>> > >> > The main purpose of producing heartbeat records is to be able
> to
> >>>> emit
> >>>> > >> > offsets w/o any new records. We are using heartbeat records to
> >>>> solve
> >>>> > the
> >>>> > >> > primary concern of offsets getting stalled. The reason to do
> >>>> that was
> >>>> > >> once
> >>>> > >> > we get SourceRecords, then the rest of the code is already in
> >>>> place to
> >>>> > >> > write it to a topic of interest and commit offsets and that
> >>>> seemed the
> >>>> > >> most
> >>>> > >> > non invasive in terms of framework level changes. If in the
> >>>> future we
> >>>> > >> want
> >>>> > >> > to do a framework-only heartbeat record support, then this
> would
> >>>> > create
> >>>> > >> > confusion as you pointed out. Do you think the choice of the
> name
> >>>> > >> heartbeat
> >>>> > >> > records is creating confusion in this case? Maybe we can call
> >>>> these
> >>>> > >> special
> >>>> > >> > records something else (not sure what at this point) which
> would
> >>>> then
> >>>> > >> > decouple the 2 logically and implementation wise as well?
> >>>> > >> >
> >>>> > >> > Thanks!
> >>>> > >> > Sagar.
> >>>> > >> >
> >>>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton
> >>>> <chr...@aiven.io.invalid
> >>>> > >
> >>>> > >> > wrote:
> >>>> > >> >
> >>>> > >> > > Hi Sagar,
> >>>> > >> > >
> >>>> > >> > > Thanks for the KIP! I have some thoughts.
> >>>> > >> > >
> >>>> > >> > > Nits:
> >>>> > >> > >
> >>>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on
> >>>> the KIP?
> >>>> > >> Or
> >>>> > >> > is
> >>>> > >> > > there a different ticket that should be associated with it?
> >>>> > >> > > 2. The current state is listed as "Draft". Considering it's
> >>>> been
> >>>> > >> brought
> >>>> > >> > up
> >>>> > >> > > for discussion, maybe the KIP should be updated to
> >>>> "Discussion"?
> >>>> > >> > > 3. Can you add a link for the discussion thread to the KIP?
> >>>> > >> > > 4. The KIP states that "In this process, offsets are written
> at
> >>>> > >> regular
> >>>> > >> > > intervals(driven by `offset.flush.interval.ms`)". This isn't
> >>>> > strictly
> >>>> > >> > > accurate since, when exactly-once support is enabled, offset
> >>>> commits
> >>>> > >> can
> >>>> > >> > > also be performed for each record batch (which is the
> default)
> >>>> or
> >>>> > when
> >>>> > >> > > explicitly requested by the task instance (if the connector
> >>>> > implements
> >>>> > >> > the
> >>>> > >> > > API to define its own transactions and the user has
> configured
> >>>> it to
> >>>> > >> do
> >>>> > >> > > so). Maybe better to just say "Offsets are written
> >>>> periodically"?
> >>>> > >> > > 5. The description for the (per-connector)
> >>>> "heartbeat.records.topic
> >>>> > "
> >>>> > >> > > property states that it is "Only applicable in distributed
> >>>> mode; in
> >>>> > >> > > standalone mode, setting this property will have no effect".
> >>>> Is this
> >>>> > >> > > correct?
> >>>> > >> > >
> >>>> > >> > > Non-nits:
> >>>> > >> > >
> >>>> > >> > > 6. It seems (based on both the KIP and discussion on
> >>>> KAFKA-3821)
> >>>> > that
> >>>> > >> the
> >>>> > >> > > only use case for being able to emit offsets without also
> >>>> emitting
> >>>> > >> source
> >>>> > >> > > records that's been identified so far is for CDC source
> >>>> connectors
> >>>> > >> like
> >>>> > >> > > Debezium. But Debezium already has support for this exact
> >>>> feature
> >>>> > >> > (emitting
> >>>> > >> > > heartbeat records that include offsets that cannot be
> >>>> associated
> >>>> > with
> >>>> > >> > > other, "regular" source records). Why should we add this
> >>>> feature to
> >>>> > >> Kafka
> >>>> > >> > > Connect when the problem it addresses is already solved in
> the
> >>>> set
> >>>> > >> > > connectors that (it seems) would have any need for it, and
> the
> >>>> size
> >>>> > of
> >>>> > >> > that
> >>>> > >> > > set is extremely small? If there are other practical use
> cases
> >>>> for
> >>>> > >> > > connectors that would benefit from this feature, please let
> me
> >>>> know.
> >>>> > >> > >
> >>>> > >> > > 7. If a task produces heartbeat records and source records
> >>>> that use
> >>>> > >> the
> >>>> > >> > > same source partition, which offset will ultimately be
> >>>> committed?
> >>>> > >> > >
> >>>> > >> > > 8. The SourceTask::produceHeartbeatRecords method returns a
> >>>> > >> > > List<SourceRecord>, and users can control the heartbeat topic
> >>>> for a
> >>>> > >> > > connector via the (connector- or worker-level)
> >>>> > >> "heartbeat.records.topic"
> >>>> > >> > > property. Since every constructor for the SourceRecord class
> >>>> [2]
> >>>> > >> > requires a
> >>>> > >> > > topic to be supplied, what will happen to that topic? Will it
> >>>> be
> >>>> > >> ignored?
> >>>> > >> > > If so, I think we should look for a cleaner solution.
> >>>> > >> > >
> >>>> > >> > > 9. A large concern raised in the discussion for KAFKA-3821
> was
> >>>> the
> >>>> > >> > allowing
> >>>> > >> > > connectors to control the ordering of these special
> >>>> "offsets-only"
> >>>> > >> > > emissions and the regular source records returned from
> >>>> > >> SourceTask::poll.
> >>>> > >> > > Are we choosing to ignore that concern? If so, can you add
> >>>> this to
> >>>> > the
> >>>> > >> > > rejected alternatives section along with a rationale?
> >>>> > >> > >
> >>>> > >> > > 10. If, sometime in the future, we wanted to add
> >>>> framework-level
> >>>> > >> support
> >>>> > >> > > for sending heartbeat records that doesn't require connectors
> >>>> to
> >>>> > >> > implement
> >>>> > >> > > any new APIs (e.g., SourceTask::produceHeartbeatRecords), a
> >>>> lot of
> >>>> > >> this
> >>>> > >> > > would paint us into a corner design-wise. We'd have to think
> >>>> > carefully
> >>>> > >> > > about which property names would be used, how to account for
> >>>> > >> connectors
> >>>> > >> > > that have already implemented the
> >>>> > SourceTask::produceHeartbeatRecords
> >>>> > >> > > method, etc. In general, it seems like we're trying to solve
> >>>> two
> >>>> > >> > completely
> >>>> > >> > > different problems with this single KIP: adding
> framework-level
> >>>> > >> support
> >>>> > >> > for
> >>>> > >> > > emitting heartbeat records for source connectors, and
> allowing
> >>>> > source
> >>>> > >> > > connectors to emit offsets without also emitting source
> >>>> records. I
> >>>> > >> don't
> >>>> > >> > > mind addressing the two at the same time if the result is
> >>>> elegant
> >>>> > and
> >>>> > >> > > doesn't compromise on the solution for either problem, but
> that
> >>>> > >> doesn't
> >>>> > >> > > seem to be the case here. Of the two problems, could we
> >>>> describe one
> >>>> > >> as
> >>>> > >> > the
> >>>> > >> > > primary and one as the secondary? If so, we might consider
> >>>> dropping
> >>>> > >> the
> >>>> > >> > > secondary problm from this KIP and addressing it separately.
> >>>> > >> > >
> >>>> > >> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821
> >>>> > >> > > [2] -
> >>>> > >> > >
> >>>> > >> > >
> >>>> > >> >
> >>>> > >>
> >>>> >
> >>>>
> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html
> >>>> > >> > >
> >>>> > >> > > Cheers,
> >>>> > >> > >
> >>>> > >> > > Chris
> >>>> > >> > >
> >>>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar <
> >>>> sagarmeansoc...@gmail.com>
> >>>> > >> > wrote:
> >>>> > >> > >
> >>>> > >> > > > Hi John,
> >>>> > >> > > >
> >>>> > >> > > > Thanks for taking. look at the KIP!
> >>>> > >> > > >
> >>>> > >> > > > The point about stream time not advancing in case of
> >>>> infrequent
> >>>> > >> updates
> >>>> > >> > > is
> >>>> > >> > > > an interesting one. I can imagine if the upstream producer
> >>>> to a
> >>>> > >> Kafka
> >>>> > >> > > > Streams application is a Source Connector which isn't
> sending
> >>>> > >> records
> >>>> > >> > > > frequently(due to the nature of the data ingestion for
> >>>> example),
> >>>> > >> then
> >>>> > >> > the
> >>>> > >> > > > downstream stream processing can land into the issues you
> >>>> > described
> >>>> > >> > > above.
> >>>> > >> > > >
> >>>> > >> > > > Which also brings me to the second point you made about how
> >>>> this
> >>>> > >> would
> >>>> > >> > be
> >>>> > >> > > > used by downstream consumers. IIUC, you are referring to
> the
> >>>> > >> consumers
> >>>> > >> > of
> >>>> > >> > > > the newly added topic i.e the heartbeat topic. In my mind,
> >>>> the
> >>>> > >> > heartbeat
> >>>> > >> > > > topic is an internal topic (similar to
> offsets/config/status
> >>>> topic
> >>>> > >> in
> >>>> > >> > > > connect), the main purpose of which is to trick the
> >>>> framework to
> >>>> > >> > produce
> >>>> > >> > > > records to the offsets topic and advance the offsets. Since
> >>>> every
> >>>> > >> > > connector
> >>>> > >> > > > could have a different definition of offsets(LSN, BinLogID
> >>>> etc for
> >>>> > >> > > > example), that logic to determine what the heartbeat
> records
> >>>> > should
> >>>> > >> be
> >>>> > >> > > > would have to reside in the actual connector.
> >>>> > >> > > >
> >>>> > >> > > > Now that I think of it, it could very well be consumed by
> >>>> > downstream
> >>>> > >> > > > consumers/ Streams or Flink Applications and be further
> used
> >>>> for
> >>>> > >> some
> >>>> > >> > > > decision making. A very crude example could be let's say if
> >>>> the
> >>>> > >> > heartbeat
> >>>> > >> > > > records sent to the new heartbeat topic include timestamps,
> >>>> then
> >>>> > the
> >>>> > >> > > > downstream streams application can use that timestamp to
> >>>> close any
> >>>> > >> time
> >>>> > >> > > > windows. Having said that, it still appears to me that it's
> >>>> > outside
> >>>> > >> the
> >>>> > >> > > > scope of the Connect framework and is something which is
> >>>> difficult
> >>>> > >> to
> >>>> > >> > > > generalise because of the variety of Sources and the
> >>>> definitions
> >>>> > of
> >>>> > >> > > > offsets.
> >>>> > >> > > >
> >>>> > >> > > > But, I would still be more than happy to add this example
> if
> >>>> you
> >>>> > >> think
> >>>> > >> > it
> >>>> > >> > > > can be useful in getting a better understanding of the idea
> >>>> and
> >>>> > also
> >>>> > >> > its
> >>>> > >> > > > utility beyond connect. Please let me know!
> >>>> > >> > > >
> >>>> > >> > > > Thanks!
> >>>> > >> > > > Sagar.
> >>>> > >> > > >
> >>>> > >> > > >
> >>>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler <
> >>>> vvcep...@apache.org
> >>>> > >
> >>>> > >> > > wrote:
> >>>> > >> > > >
> >>>> > >> > > > > Thanks for the KIP, Sagar!
> >>>> > >> > > > >
> >>>> > >> > > > > At first glance, this seems like a very useful feature.
> >>>> > >> > > > >
> >>>> > >> > > > > A common pain point in Streams is when upstream producers
> >>>> don't
> >>>> > >> send
> >>>> > >> > > > > regular updates and stream time cannot advance. This
> causes
> >>>> > >> > > > > stream-time-driven operations to appear to hang, like
> time
> >>>> > windows
> >>>> > >> > not
> >>>> > >> > > > > closing, suppressions not firing, etc.
> >>>> > >> > > > >
> >>>> > >> > > > > From your KIP, I have a good idea of how the feature
> would
> >>>> be
> >>>> > >> > > integrated
> >>>> > >> > > > > into connect, and it sounds good to me. I don't quite see
> >>>> how
> >>>> > >> > > downstream
> >>>> > >> > > > > clients, such as a downstream Streams or Flink
> >>>> application, or
> >>>> > >> users
> >>>> > >> > of
> >>>> > >> > > > the
> >>>> > >> > > > > Consumer would make use of this feature. Could you add
> some
> >>>> > >> examples
> >>>> > >> > of
> >>>> > >> > > > > that nature?
> >>>> > >> > > > >
> >>>> > >> > > > > Thank you,
> >>>> > >> > > > > -John
> >>>> > >> > > > >
> >>>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> >>>> > >> > > > > > Hi All,
> >>>> > >> > > > > >
> >>>> > >> > > > > > Bumping the thread again.
> >>>> > >> > > > > >
> >>>> > >> > > > > > Sagar.
> >>>> > >> > > > > >
> >>>> > >> > > > > >
> >>>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <
> >>>> > >> sagarmeansoc...@gmail.com>
> >>>> > >> > > > wrote:
> >>>> > >> > > > > >
> >>>> > >> > > > > >> Hi All,
> >>>> > >> > > > > >>
> >>>> > >> > > > > >> Bumping this discussion thread again.
> >>>> > >> > > > > >>
> >>>> > >> > > > > >> Thanks!
> >>>> > >> > > > > >> Sagar.
> >>>> > >> > > > > >>
> >>>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <
> >>>> > >> sagarmeansoc...@gmail.com>
> >>>> > >> > > > wrote:
> >>>> > >> > > > > >>
> >>>> > >> > > > > >>> Hi All,
> >>>> > >> > > > > >>>
> >>>> > >> > > > > >>> I wanted to create a discussion thread for KIP-910:
> >>>> > >> > > > > >>>
> >>>> > >> > > > > >>>
> >>>> > >> > > > > >>>
> >>>> > >> > > > >
> >>>> > >> > > >
> >>>> > >> > >
> >>>> > >> >
> >>>> > >>
> >>>> >
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> >>>> > >> > > > > >>>
> >>>> > >> > > > > >>> Thanks!
> >>>> > >> > > > > >>> Sagar.
> >>>> > >> > > > > >>>
> >>>> > >> > > > > >>
> >>>> > >> > > > >
> >>>> > >> > > >
> >>>> > >> > >
> >>>> > >> >
> >>>> > >>
> >>>> > >
> >>>> >
> >>>>
> >>>
>

Reply via email to