Hi Sagar,

I'm sorry, I'm still not convinced that this design solves the problem(s)
it sets out to solve in the best way possible. I tried to highlight this in
my last email:

> In general, it seems like we're trying to solve two completely different
problems with this single KIP: adding framework-level support for emitting
heartbeat records for source connectors, and allowing source connectors to
emit offsets without also emitting source records. I don't mind addressing
the two at the same time if the result is elegant and doesn't compromise on
the solution for either problem, but that doesn't seem to be the case here.
Of the two problems, could we describe one as the primary and one as the
secondary? If so, we might consider dropping the secondary problem from
this KIP and addressing it separately.

If we wanted to add support for heartbeat records, we could (and IMO
should) do that without requiring connectors to implement any new methods
and only require adjustments to worker or connector configurations by users
in order to enable that feature.

If we want to add support for connectors to emit offsets without
accompanying source records, we could (and IMO should) do that without
requiring users to manually enable that feature by adjusting worker or
connector configurations.


I'm also not sure that it's worth preserving the current behavior that
offsets for records that have been filtered out via SMT are not committed.
I can't think of a case where this would be useful and there are obviously
plenty where it isn't. There's also a slight discrepancy in how these kinds
of records are treated by the Connect runtime now; if a record is dropped
because of an SMT, then its offset isn't committed, but if it's dropped
because exactly-once support is enabled and the connector chose to abort
the batch containing the record, then its offset is still committed. After
thinking carefully about the aborted transaction behavior, we realized that
it was fine to commit the offsets for those records, and I believe that the
same logic can be applied to any record that we're done trying to send to
Kafka (regardless of whether it was sent correctly, dropped due to producer
error, filtered via SMT, etc.).

I also find the file-based source connector example a little confusing.
What about that kind of connector causes the offset for the last record of
a file to be treated differently? Is there anything different about
filtering that record via SMT vs. dropping it altogether because of an
asynchronous producer error with "errors.tolerance" set to "all"? And
finally, how would such a connector use the design proposed here?

Finally, I don't disagree that if there are other legitimate use cases that
would be helped by addressing KAFKA-3821, we should try to solve that issue
in the Kafka Connect framework instead of requiring individual connectors
to implement their own solutions. But the cognitive load added by the
design proposed here, for connector developers and Connect cluster
administrators alike, costs too much to justify by pointing to an
already-solved problem encountered by a single group of connectors (i.e.,
Debezium). This is why I think it's crucial that we identify realistic
cases where this feature would actually be useful, and right now, I don't
think any have been provided (at least, not ones that have already been
addressed or could be addressed with much simpler changes).

Cheers,

Chris

On Tue, Apr 11, 2023 at 7:30 AM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi Chris,
>
> Thanks for your detailed feedback!
>
> nits: I have taken care of them now. Thanks for pointing those out.
>
> non-nits:
>
> 6) It seems (based on both the KIP and discussion on KAFKA-3821) that the
> > only use case for being able to emit offsets without also emitting source
> > records that's been identified so far is for CDC source connectors like
> > Debezium.
>
>
> I am aware of atleast one more case where the non production of offsets
> (due to non production of records ) leads to the failure of connectors when
> the source purges the records of interest. This happens in File based
> source connectors  (like s3/blob storage ) in which if the last record from
> a file is fiterterd due to an SMT, then that particular file is never
> committed to the source partition and eventually when the file is deleted
> from the source and the connector is restarted due to some reason, it
> fails.
> Moreover, I feel the reason this support should be there in the Kafka
> Connect framework is because this is a restriction of the framework and
> today the framework provides no support for getting around this limitation.
> Every connector has it's own way of handling offsets and having each
> connector handle this restriction in its own way can make it complex.
> Whether we choose to do it the way this KIP prescribes or any other way is
> up for debate but IMHO, the framework should provide a way of
> getting around this limitation.
>
> 7. If a task produces heartbeat records and source records that use the
> > same source partition, which offset will ultimately be committed?
>
>
> The idea is to add the records returned by the `produceHeartbeatRecords`
> to  the same `toSend` list within `AbstractWorkerSourceTask#execute`. The
> `produceHeartbeatRecords` would be invoked before we make the `poll` call.
> Hence, the offsets committed would be in the same order in which they would
> be written. Note that, the onus is on the Connector implementation to not
> return records which can lead to data loss or data going out of order. The
> framework would just commit based on whatever is supplied. Also, AFAIK, 2
> `normal` source records can also produce the same source partitions and
> they are committed in the order in which they are written.
>
> 8. The SourceTask::produceHeartbeatRecords method returns a
> > List<SourceRecord>, and users can control the heartbeat topic for a
> > connector via the (connector- or worker-level) "heartbeat.records.topic"
> > property. Since every constructor for the SourceRecord class [2]
> requires a
> > topic to be supplied, what will happen to that topic? Will it be ignored?
> > If so, I think we should look for a cleaner solution.
>
>
> Sorry, I couldn't quite follow which topic will be ignored in this case.
>
> 9. A large concern raised in the discussion for KAFKA-3821 was the allowing
> > connectors to control the ordering of these special "offsets-only"
> > emissions and the regular source records returned from SourceTask::poll.
> > Are we choosing to ignore that concern? If so, can you add this to the
> > rejected alternatives section along with a rationale?
>
>
> One thing to note is that the for every connector, the condition to emit
> the heartbeat record is totally up to the connector, For example, for a
> connector which is tracking transactions for an ordered log, if there are
> open transactions, it might not need to emit heartbeat records when the
> timer expires while for file based connectors, if the same file is being
> processed again and again due to an SMT or some other reasons, then it can
> choose to emit that partition. The uber point here is that every connector
> has it's own requirements and the framework can't really make an assumption
> about it. What the KIP is trying to do is to provide a mechanism to the
> connector to commit new offsets. With this approach, as far as I can think
> so far, there doesn't seem to be a case of out of order processing. If you
> have other concerns/thoughts I would be happy to know them.
>
> 10. If, sometime in the future, we wanted to add framework-level support
> > for sending heartbeat records that doesn't require connectors to
> implement
> > any new APIs...
>
>
> The main purpose of producing heartbeat records is to be able to emit
> offsets w/o any new records. We are using heartbeat records to solve the
> primary concern of offsets getting stalled. The reason to do that was once
> we get SourceRecords, then the rest of the code is already in place to
> write it to a topic of interest and commit offsets and that seemed the most
> non invasive in terms of framework level changes. If in the future we want
> to do a framework-only heartbeat record support, then this would create
> confusion as you pointed out. Do you think the choice of the name heartbeat
> records is creating confusion in this case? Maybe we can call these special
> records something else (not sure what at this point) which would then
> decouple the 2 logically and implementation wise as well?
>
> Thanks!
> Sagar.
>
> On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Sagar,
> >
> > Thanks for the KIP! I have some thoughts.
> >
> > Nits:
> >
> > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on the KIP? Or
> is
> > there a different ticket that should be associated with it?
> > 2. The current state is listed as "Draft". Considering it's been brought
> up
> > for discussion, maybe the KIP should be updated to "Discussion"?
> > 3. Can you add a link for the discussion thread to the KIP?
> > 4. The KIP states that "In this process, offsets are written at regular
> > intervals(driven by `offset.flush.interval.ms`)". This isn't strictly
> > accurate since, when exactly-once support is enabled, offset commits can
> > also be performed for each record batch (which is the default) or when
> > explicitly requested by the task instance (if the connector implements
> the
> > API to define its own transactions and the user has configured it to do
> > so). Maybe better to just say "Offsets are written periodically"?
> > 5. The description for the (per-connector) "heartbeat.records.topic "
> > property states that it is "Only applicable in distributed mode; in
> > standalone mode, setting this property will have no effect". Is this
> > correct?
> >
> > Non-nits:
> >
> > 6. It seems (based on both the KIP and discussion on KAFKA-3821) that the
> > only use case for being able to emit offsets without also emitting source
> > records that's been identified so far is for CDC source connectors like
> > Debezium. But Debezium already has support for this exact feature
> (emitting
> > heartbeat records that include offsets that cannot be associated with
> > other, "regular" source records). Why should we add this feature to Kafka
> > Connect when the problem it addresses is already solved in the set
> > connectors that (it seems) would have any need for it, and the size of
> that
> > set is extremely small? If there are other practical use cases for
> > connectors that would benefit from this feature, please let me know.
> >
> > 7. If a task produces heartbeat records and source records that use the
> > same source partition, which offset will ultimately be committed?
> >
> > 8. The SourceTask::produceHeartbeatRecords method returns a
> > List<SourceRecord>, and users can control the heartbeat topic for a
> > connector via the (connector- or worker-level) "heartbeat.records.topic"
> > property. Since every constructor for the SourceRecord class [2]
> requires a
> > topic to be supplied, what will happen to that topic? Will it be ignored?
> > If so, I think we should look for a cleaner solution.
> >
> > 9. A large concern raised in the discussion for KAFKA-3821 was the
> allowing
> > connectors to control the ordering of these special "offsets-only"
> > emissions and the regular source records returned from SourceTask::poll.
> > Are we choosing to ignore that concern? If so, can you add this to the
> > rejected alternatives section along with a rationale?
> >
> > 10. If, sometime in the future, we wanted to add framework-level support
> > for sending heartbeat records that doesn't require connectors to
> implement
> > any new APIs (e.g., SourceTask::produceHeartbeatRecords), a lot of this
> > would paint us into a corner design-wise. We'd have to think carefully
> > about which property names would be used, how to account for connectors
> > that have already implemented the SourceTask::produceHeartbeatRecords
> > method, etc. In general, it seems like we're trying to solve two
> completely
> > different problems with this single KIP: adding framework-level support
> for
> > emitting heartbeat records for source connectors, and allowing source
> > connectors to emit offsets without also emitting source records. I don't
> > mind addressing the two at the same time if the result is elegant and
> > doesn't compromise on the solution for either problem, but that doesn't
> > seem to be the case here. Of the two problems, could we describe one as
> the
> > primary and one as the secondary? If so, we might consider dropping the
> > secondary problm from this KIP and addressing it separately.
> >
> > [1] - https://issues.apache.org/jira/browse/KAFKA-3821
> > [2] -
> >
> >
> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html
> >
> > Cheers,
> >
> > Chris
> >
> > On Sat, Mar 25, 2023 at 11:18 PM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> >
> > > Hi John,
> > >
> > > Thanks for taking. look at the KIP!
> > >
> > > The point about stream time not advancing in case of infrequent updates
> > is
> > > an interesting one. I can imagine if the upstream producer to a Kafka
> > > Streams application is a Source Connector which isn't sending records
> > > frequently(due to the nature of the data ingestion for example), then
> the
> > > downstream stream processing can land into the issues you described
> > above.
> > >
> > > Which also brings me to the second point you made about how this would
> be
> > > used by downstream consumers. IIUC, you are referring to the consumers
> of
> > > the newly added topic i.e the heartbeat topic. In my mind, the
> heartbeat
> > > topic is an internal topic (similar to offsets/config/status topic in
> > > connect), the main purpose of which is to trick the framework to
> produce
> > > records to the offsets topic and advance the offsets. Since every
> > connector
> > > could have a different definition of offsets(LSN, BinLogID etc for
> > > example), that logic to determine what the heartbeat records should be
> > > would have to reside in the actual connector.
> > >
> > > Now that I think of it, it could very well be consumed by downstream
> > > consumers/ Streams or Flink Applications and be further used for some
> > > decision making. A very crude example could be let's say if the
> heartbeat
> > > records sent to the new heartbeat topic include timestamps, then the
> > > downstream streams application can use that timestamp to close any time
> > > windows. Having said that, it still appears to me that it's outside the
> > > scope of the Connect framework and is something which is difficult to
> > > generalise because of the variety of Sources and the definitions of
> > > offsets.
> > >
> > > But, I would still be more than happy to add this example if you think
> it
> > > can be useful in getting a better understanding of the idea and also
> its
> > > utility beyond connect. Please let me know!
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > >
> > > > Thanks for the KIP, Sagar!
> > > >
> > > > At first glance, this seems like a very useful feature.
> > > >
> > > > A common pain point in Streams is when upstream producers don't send
> > > > regular updates and stream time cannot advance. This causes
> > > > stream-time-driven operations to appear to hang, like time windows
> not
> > > > closing, suppressions not firing, etc.
> > > >
> > > > From your KIP, I have a good idea of how the feature would be
> > integrated
> > > > into connect, and it sounds good to me. I don't quite see how
> > downstream
> > > > clients, such as a downstream Streams or Flink application, or users
> of
> > > the
> > > > Consumer would make use of this feature. Could you add some examples
> of
> > > > that nature?
> > > >
> > > > Thank you,
> > > > -John
> > > >
> > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> > > > > Hi All,
> > > > >
> > > > > Bumping the thread again.
> > > > >
> > > > > Sagar.
> > > > >
> > > > >
> > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <sagarmeansoc...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi All,
> > > > >>
> > > > >> Bumping this discussion thread again.
> > > > >>
> > > > >> Thanks!
> > > > >> Sagar.
> > > > >>
> > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <sagarmeansoc...@gmail.com>
> > > wrote:
> > > > >>
> > > > >>> Hi All,
> > > > >>>
> > > > >>> I wanted to create a discussion thread for KIP-910:
> > > > >>>
> > > > >>>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > > > >>>
> > > > >>> Thanks!
> > > > >>> Sagar.
> > > > >>>
> > > > >>
> > > >
> > >
> >
>

Reply via email to