One thing I forgot to mention in my previous email was that the reason I
chose to include the opt-in behaviour via configs was that the users of the
connector know their workload patterns. If the workload is such that the
 connector would receive regular valid updates then there’s ideally no need
for moving offsets since it would update automatically.

This way they aren’t forced to use this feature and can use it only when
the workload is expected to be batchy or not frequent.

Thanks!
Sagar.


On Fri, 14 Apr 2023 at 5:32 PM, Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi Chris,
>
> Thanks for following up on the response. Sharing my thoughts further:
>
> If we want to add support for connectors to emit offsets without
>> accompanying source records, we could (and IMO should) do that without
>> requiring users to manually enable that feature by adjusting worker or
>> connector configurations.
>
>
> With the current KIP design, I have tried to implement this in an opt-in
> manner via configs. I guess what you are trying to say is that this doesn't
> need a config of it's own and instead could be part of the poll ->
> transform etc -> produce -> commit cycle. That way, the users don't need to
> set any config and if the connector supports moving offsets w/o producing
> SourceRecords, it should happen automatically. Is that correct? If that
> is the concern, then I can think of not exposing a config and try to make
> this process automatically. That should ease the load on connector users,
> but your point about cognitive load on Connector developers, I am still not
> sure how to address that. The offsets are privy to a connector and the
> framework at best can provide hooks to the tasks to update their offsets.
> Connector developers would still have to consider all cases before updating
> offsets.  And if I ignore the heartbeat topic and heartbeat interval ms
> configs, then what the KIP proposes currently isn't much different in that
> regard. Just that it produces a List of SourceRecord which can be changed
> to a Map of SourcePartition and their offsets if you think that would
> simplify things. Are there other cases in your mind which need addressing?
>
> Here's my take on the usecases:
>
>    1. Regarding the example about SMTs with Object Storage based
>    connectors, it was one of the scenarios identified. We have some connectors
>    that rely on the offsets topic to check if the next batch of files should
>    be processed and because of filtering of the last record from the files,
>    the eof supposedly is  never reached and the connector can't commit offsets
>    for that source partition(file). If there was a mechanism to update offsets
>    for such a source file, then with some moderately complex state tracking,
>    the connector can mark that file as processed and proceed.
>    2. There's another use case with the same class of connectors where if
>    a file is malformed, then the connector couldn't produce any offsets
>    because the file couldn't get processed completely. To handle such cases,
>    the connector developers have introduced a dev/null sort of topic where
>    they produce a record to this corrupted file topic and move the offset
>    somehow. This topic ideally isn't needed and with a mechanism to update
>    offsets would have helped in this case as well.
>    3. Coming to CDC based connectors,
>       1. We had a similar issue with Oracle CDC source connector and
>       needed to employ the same heartbeat mechanism to get around it.
>       2. MongoDB CDC source Connector  has employed the same heartbeat
>       mechanism Check `heartbeat.interval.ms` here (
>       
> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
>       ).
>       3. Another CDC connector for ScyllaDB employs a similar mechanism.
>       
> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
>       4. For CDC based connectors, you could argue that these connectors
>       have been able to solve this error then why do we need framework level
>       support. But the point I am trying to make is that this limitation from 
> the
>       framework is forcing CDC connector developers to implement per-connector
>       solutions/hacks(at times). And there could always be more CDC 
> connectors in
>       the pipeline forcing them to take a similar route as well.
>    4. There's also a case at times with CDC source connectors which are
>    REST Api / Web Service based(Zendesk Source Connector for example) . These
>    connectors typically use timestamps from the responses as offsets. If
>    there's a long period of inactivity wherein the API invocations don't
>    return any data, then the offsets won't move and the connector would keep
>    using the same timestamp that it received from the last non-empty response.
>    If this period of inactivity keeps growing, and the API imposes any limits
>    on how far back we can go in terms of window start, then this could
>    potentially be a problem. In this case even though the connector was caught
>    up with all the responses, it may need to snapshot again. In this case
>    updating offsets can easily help since all the connector needs to do is to
>    move the timestamp which would move the offset inherently.
>
> I still believe that this is something the framework should support OOB
> irrespective of whether the connectors have been able to get around this
> restriction or not.
>
> Lastly, about your comments here:
>
> I'm also not sure that it's worth preserving the current behavior that
>> offsets for records that have been filtered out via SMT are not committed.
>
>
> Let me know if we need a separate JIRA to track this? This somehow didn't
> look related to this discussion.
>
> Thanks!
> Sagar.
>
>
> On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
>> Hi Sagar,
>>
>> I'm sorry, I'm still not convinced that this design solves the problem(s)
>> it sets out to solve in the best way possible. I tried to highlight this
>> in
>> my last email:
>>
>> > In general, it seems like we're trying to solve two completely different
>> problems with this single KIP: adding framework-level support for emitting
>> heartbeat records for source connectors, and allowing source connectors to
>> emit offsets without also emitting source records. I don't mind addressing
>> the two at the same time if the result is elegant and doesn't compromise
>> on
>> the solution for either problem, but that doesn't seem to be the case
>> here.
>> Of the two problems, could we describe one as the primary and one as the
>> secondary? If so, we might consider dropping the secondary problem from
>> this KIP and addressing it separately.
>>
>> If we wanted to add support for heartbeat records, we could (and IMO
>> should) do that without requiring connectors to implement any new methods
>> and only require adjustments to worker or connector configurations by
>> users
>> in order to enable that feature.
>>
>> If we want to add support for connectors to emit offsets without
>> accompanying source records, we could (and IMO should) do that without
>> requiring users to manually enable that feature by adjusting worker or
>> connector configurations.
>>
>>
>> I'm also not sure that it's worth preserving the current behavior that
>> offsets for records that have been filtered out via SMT are not committed.
>> I can't think of a case where this would be useful and there are obviously
>> plenty where it isn't. There's also a slight discrepancy in how these
>> kinds
>> of records are treated by the Connect runtime now; if a record is dropped
>> because of an SMT, then its offset isn't committed, but if it's dropped
>> because exactly-once support is enabled and the connector chose to abort
>> the batch containing the record, then its offset is still committed. After
>> thinking carefully about the aborted transaction behavior, we realized
>> that
>> it was fine to commit the offsets for those records, and I believe that
>> the
>> same logic can be applied to any record that we're done trying to send to
>> Kafka (regardless of whether it was sent correctly, dropped due to
>> producer
>> error, filtered via SMT, etc.).
>>
>> I also find the file-based source connector example a little confusing.
>> What about that kind of connector causes the offset for the last record of
>> a file to be treated differently? Is there anything different about
>> filtering that record via SMT vs. dropping it altogether because of an
>> asynchronous producer error with "errors.tolerance" set to "all"? And
>> finally, how would such a connector use the design proposed here?
>>
>> Finally, I don't disagree that if there are other legitimate use cases
>> that
>> would be helped by addressing KAFKA-3821, we should try to solve that
>> issue
>> in the Kafka Connect framework instead of requiring individual connectors
>> to implement their own solutions. But the cognitive load added by the
>> design proposed here, for connector developers and Connect cluster
>> administrators alike, costs too much to justify by pointing to an
>> already-solved problem encountered by a single group of connectors (i.e.,
>> Debezium). This is why I think it's crucial that we identify realistic
>> cases where this feature would actually be useful, and right now, I don't
>> think any have been provided (at least, not ones that have already been
>> addressed or could be addressed with much simpler changes).
>>
>> Cheers,
>>
>> Chris
>>
>> On Tue, Apr 11, 2023 at 7:30 AM Sagar <sagarmeansoc...@gmail.com> wrote:
>>
>> > Hi Chris,
>> >
>> > Thanks for your detailed feedback!
>> >
>> > nits: I have taken care of them now. Thanks for pointing those out.
>> >
>> > non-nits:
>> >
>> > 6) It seems (based on both the KIP and discussion on KAFKA-3821) that
>> the
>> > > only use case for being able to emit offsets without also emitting
>> source
>> > > records that's been identified so far is for CDC source connectors
>> like
>> > > Debezium.
>> >
>> >
>> > I am aware of atleast one more case where the non production of offsets
>> > (due to non production of records ) leads to the failure of connectors
>> when
>> > the source purges the records of interest. This happens in File based
>> > source connectors  (like s3/blob storage ) in which if the last record
>> from
>> > a file is fiterterd due to an SMT, then that particular file is never
>> > committed to the source partition and eventually when the file is
>> deleted
>> > from the source and the connector is restarted due to some reason, it
>> > fails.
>> > Moreover, I feel the reason this support should be there in the Kafka
>> > Connect framework is because this is a restriction of the framework and
>> > today the framework provides no support for getting around this
>> limitation.
>> > Every connector has it's own way of handling offsets and having each
>> > connector handle this restriction in its own way can make it complex.
>> > Whether we choose to do it the way this KIP prescribes or any other way
>> is
>> > up for debate but IMHO, the framework should provide a way of
>> > getting around this limitation.
>> >
>> > 7. If a task produces heartbeat records and source records that use the
>> > > same source partition, which offset will ultimately be committed?
>> >
>> >
>> > The idea is to add the records returned by the `produceHeartbeatRecords`
>> > to  the same `toSend` list within `AbstractWorkerSourceTask#execute`.
>> The
>> > `produceHeartbeatRecords` would be invoked before we make the `poll`
>> call.
>> > Hence, the offsets committed would be in the same order in which they
>> would
>> > be written. Note that, the onus is on the Connector implementation to
>> not
>> > return records which can lead to data loss or data going out of order.
>> The
>> > framework would just commit based on whatever is supplied. Also, AFAIK,
>> 2
>> > `normal` source records can also produce the same source partitions and
>> > they are committed in the order in which they are written.
>> >
>> > 8. The SourceTask::produceHeartbeatRecords method returns a
>> > > List<SourceRecord>, and users can control the heartbeat topic for a
>> > > connector via the (connector- or worker-level)
>> "heartbeat.records.topic"
>> > > property. Since every constructor for the SourceRecord class [2]
>> > requires a
>> > > topic to be supplied, what will happen to that topic? Will it be
>> ignored?
>> > > If so, I think we should look for a cleaner solution.
>> >
>> >
>> > Sorry, I couldn't quite follow which topic will be ignored in this case.
>> >
>> > 9. A large concern raised in the discussion for KAFKA-3821 was the
>> allowing
>> > > connectors to control the ordering of these special "offsets-only"
>> > > emissions and the regular source records returned from
>> SourceTask::poll.
>> > > Are we choosing to ignore that concern? If so, can you add this to the
>> > > rejected alternatives section along with a rationale?
>> >
>> >
>> > One thing to note is that the for every connector, the condition to emit
>> > the heartbeat record is totally up to the connector, For example, for a
>> > connector which is tracking transactions for an ordered log, if there
>> are
>> > open transactions, it might not need to emit heartbeat records when the
>> > timer expires while for file based connectors, if the same file is being
>> > processed again and again due to an SMT or some other reasons, then it
>> can
>> > choose to emit that partition. The uber point here is that every
>> connector
>> > has it's own requirements and the framework can't really make an
>> assumption
>> > about it. What the KIP is trying to do is to provide a mechanism to the
>> > connector to commit new offsets. With this approach, as far as I can
>> think
>> > so far, there doesn't seem to be a case of out of order processing. If
>> you
>> > have other concerns/thoughts I would be happy to know them.
>> >
>> > 10. If, sometime in the future, we wanted to add framework-level support
>> > > for sending heartbeat records that doesn't require connectors to
>> > implement
>> > > any new APIs...
>> >
>> >
>> > The main purpose of producing heartbeat records is to be able to emit
>> > offsets w/o any new records. We are using heartbeat records to solve the
>> > primary concern of offsets getting stalled. The reason to do that was
>> once
>> > we get SourceRecords, then the rest of the code is already in place to
>> > write it to a topic of interest and commit offsets and that seemed the
>> most
>> > non invasive in terms of framework level changes. If in the future we
>> want
>> > to do a framework-only heartbeat record support, then this would create
>> > confusion as you pointed out. Do you think the choice of the name
>> heartbeat
>> > records is creating confusion in this case? Maybe we can call these
>> special
>> > records something else (not sure what at this point) which would then
>> > decouple the 2 logically and implementation wise as well?
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton <chr...@aiven.io.invalid>
>> > wrote:
>> >
>> > > Hi Sagar,
>> > >
>> > > Thanks for the KIP! I have some thoughts.
>> > >
>> > > Nits:
>> > >
>> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on the KIP?
>> Or
>> > is
>> > > there a different ticket that should be associated with it?
>> > > 2. The current state is listed as "Draft". Considering it's been
>> brought
>> > up
>> > > for discussion, maybe the KIP should be updated to "Discussion"?
>> > > 3. Can you add a link for the discussion thread to the KIP?
>> > > 4. The KIP states that "In this process, offsets are written at
>> regular
>> > > intervals(driven by `offset.flush.interval.ms`)". This isn't strictly
>> > > accurate since, when exactly-once support is enabled, offset commits
>> can
>> > > also be performed for each record batch (which is the default) or when
>> > > explicitly requested by the task instance (if the connector implements
>> > the
>> > > API to define its own transactions and the user has configured it to
>> do
>> > > so). Maybe better to just say "Offsets are written periodically"?
>> > > 5. The description for the (per-connector) "heartbeat.records.topic "
>> > > property states that it is "Only applicable in distributed mode; in
>> > > standalone mode, setting this property will have no effect". Is this
>> > > correct?
>> > >
>> > > Non-nits:
>> > >
>> > > 6. It seems (based on both the KIP and discussion on KAFKA-3821) that
>> the
>> > > only use case for being able to emit offsets without also emitting
>> source
>> > > records that's been identified so far is for CDC source connectors
>> like
>> > > Debezium. But Debezium already has support for this exact feature
>> > (emitting
>> > > heartbeat records that include offsets that cannot be associated with
>> > > other, "regular" source records). Why should we add this feature to
>> Kafka
>> > > Connect when the problem it addresses is already solved in the set
>> > > connectors that (it seems) would have any need for it, and the size of
>> > that
>> > > set is extremely small? If there are other practical use cases for
>> > > connectors that would benefit from this feature, please let me know.
>> > >
>> > > 7. If a task produces heartbeat records and source records that use
>> the
>> > > same source partition, which offset will ultimately be committed?
>> > >
>> > > 8. The SourceTask::produceHeartbeatRecords method returns a
>> > > List<SourceRecord>, and users can control the heartbeat topic for a
>> > > connector via the (connector- or worker-level)
>> "heartbeat.records.topic"
>> > > property. Since every constructor for the SourceRecord class [2]
>> > requires a
>> > > topic to be supplied, what will happen to that topic? Will it be
>> ignored?
>> > > If so, I think we should look for a cleaner solution.
>> > >
>> > > 9. A large concern raised in the discussion for KAFKA-3821 was the
>> > allowing
>> > > connectors to control the ordering of these special "offsets-only"
>> > > emissions and the regular source records returned from
>> SourceTask::poll.
>> > > Are we choosing to ignore that concern? If so, can you add this to the
>> > > rejected alternatives section along with a rationale?
>> > >
>> > > 10. If, sometime in the future, we wanted to add framework-level
>> support
>> > > for sending heartbeat records that doesn't require connectors to
>> > implement
>> > > any new APIs (e.g., SourceTask::produceHeartbeatRecords), a lot of
>> this
>> > > would paint us into a corner design-wise. We'd have to think carefully
>> > > about which property names would be used, how to account for
>> connectors
>> > > that have already implemented the SourceTask::produceHeartbeatRecords
>> > > method, etc. In general, it seems like we're trying to solve two
>> > completely
>> > > different problems with this single KIP: adding framework-level
>> support
>> > for
>> > > emitting heartbeat records for source connectors, and allowing source
>> > > connectors to emit offsets without also emitting source records. I
>> don't
>> > > mind addressing the two at the same time if the result is elegant and
>> > > doesn't compromise on the solution for either problem, but that
>> doesn't
>> > > seem to be the case here. Of the two problems, could we describe one
>> as
>> > the
>> > > primary and one as the secondary? If so, we might consider dropping
>> the
>> > > secondary problm from this KIP and addressing it separately.
>> > >
>> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821
>> > > [2] -
>> > >
>> > >
>> >
>> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html
>> > >
>> > > Cheers,
>> > >
>> > > Chris
>> > >
>> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar <sagarmeansoc...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi John,
>> > > >
>> > > > Thanks for taking. look at the KIP!
>> > > >
>> > > > The point about stream time not advancing in case of infrequent
>> updates
>> > > is
>> > > > an interesting one. I can imagine if the upstream producer to a
>> Kafka
>> > > > Streams application is a Source Connector which isn't sending
>> records
>> > > > frequently(due to the nature of the data ingestion for example),
>> then
>> > the
>> > > > downstream stream processing can land into the issues you described
>> > > above.
>> > > >
>> > > > Which also brings me to the second point you made about how this
>> would
>> > be
>> > > > used by downstream consumers. IIUC, you are referring to the
>> consumers
>> > of
>> > > > the newly added topic i.e the heartbeat topic. In my mind, the
>> > heartbeat
>> > > > topic is an internal topic (similar to offsets/config/status topic
>> in
>> > > > connect), the main purpose of which is to trick the framework to
>> > produce
>> > > > records to the offsets topic and advance the offsets. Since every
>> > > connector
>> > > > could have a different definition of offsets(LSN, BinLogID etc for
>> > > > example), that logic to determine what the heartbeat records should
>> be
>> > > > would have to reside in the actual connector.
>> > > >
>> > > > Now that I think of it, it could very well be consumed by downstream
>> > > > consumers/ Streams or Flink Applications and be further used for
>> some
>> > > > decision making. A very crude example could be let's say if the
>> > heartbeat
>> > > > records sent to the new heartbeat topic include timestamps, then the
>> > > > downstream streams application can use that timestamp to close any
>> time
>> > > > windows. Having said that, it still appears to me that it's outside
>> the
>> > > > scope of the Connect framework and is something which is difficult
>> to
>> > > > generalise because of the variety of Sources and the definitions of
>> > > > offsets.
>> > > >
>> > > > But, I would still be more than happy to add this example if you
>> think
>> > it
>> > > > can be useful in getting a better understanding of the idea and also
>> > its
>> > > > utility beyond connect. Please let me know!
>> > > >
>> > > > Thanks!
>> > > > Sagar.
>> > > >
>> > > >
>> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler <vvcep...@apache.org>
>> > > wrote:
>> > > >
>> > > > > Thanks for the KIP, Sagar!
>> > > > >
>> > > > > At first glance, this seems like a very useful feature.
>> > > > >
>> > > > > A common pain point in Streams is when upstream producers don't
>> send
>> > > > > regular updates and stream time cannot advance. This causes
>> > > > > stream-time-driven operations to appear to hang, like time windows
>> > not
>> > > > > closing, suppressions not firing, etc.
>> > > > >
>> > > > > From your KIP, I have a good idea of how the feature would be
>> > > integrated
>> > > > > into connect, and it sounds good to me. I don't quite see how
>> > > downstream
>> > > > > clients, such as a downstream Streams or Flink application, or
>> users
>> > of
>> > > > the
>> > > > > Consumer would make use of this feature. Could you add some
>> examples
>> > of
>> > > > > that nature?
>> > > > >
>> > > > > Thank you,
>> > > > > -John
>> > > > >
>> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
>> > > > > > Hi All,
>> > > > > >
>> > > > > > Bumping the thread again.
>> > > > > >
>> > > > > > Sagar.
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <
>> sagarmeansoc...@gmail.com>
>> > > > wrote:
>> > > > > >
>> > > > > >> Hi All,
>> > > > > >>
>> > > > > >> Bumping this discussion thread again.
>> > > > > >>
>> > > > > >> Thanks!
>> > > > > >> Sagar.
>> > > > > >>
>> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <
>> sagarmeansoc...@gmail.com>
>> > > > wrote:
>> > > > > >>
>> > > > > >>> Hi All,
>> > > > > >>>
>> > > > > >>> I wanted to create a discussion thread for KIP-910:
>> > > > > >>>
>> > > > > >>>
>> > > > > >>>
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>> > > > > >>>
>> > > > > >>> Thanks!
>> > > > > >>> Sagar.
>> > > > > >>>
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to