Hi Sagar,

Thanks for the KIP! I have some thoughts.

Nits:

1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on the KIP? Or is
there a different ticket that should be associated with it?
2. The current state is listed as "Draft". Considering it's been brought up
for discussion, maybe the KIP should be updated to "Discussion"?
3. Can you add a link for the discussion thread to the KIP?
4. The KIP states that "In this process, offsets are written at regular
intervals(driven by `offset.flush.interval.ms`)". This isn't strictly
accurate since, when exactly-once support is enabled, offset commits can
also be performed for each record batch (which is the default) or when
explicitly requested by the task instance (if the connector implements the
API to define its own transactions and the user has configured it to do
so). Maybe better to just say "Offsets are written periodically"?
5. The description for the (per-connector) "heartbeat.records.topic "
property states that it is "Only applicable in distributed mode; in
standalone mode, setting this property will have no effect". Is this
correct?

Non-nits:

6. It seems (based on both the KIP and discussion on KAFKA-3821) that the
only use case for being able to emit offsets without also emitting source
records that's been identified so far is for CDC source connectors like
Debezium. But Debezium already has support for this exact feature (emitting
heartbeat records that include offsets that cannot be associated with
other, "regular" source records). Why should we add this feature to Kafka
Connect when the problem it addresses is already solved in the set
connectors that (it seems) would have any need for it, and the size of that
set is extremely small? If there are other practical use cases for
connectors that would benefit from this feature, please let me know.

7. If a task produces heartbeat records and source records that use the
same source partition, which offset will ultimately be committed?

8. The SourceTask::produceHeartbeatRecords method returns a
List<SourceRecord>, and users can control the heartbeat topic for a
connector via the (connector- or worker-level) "heartbeat.records.topic"
property. Since every constructor for the SourceRecord class [2] requires a
topic to be supplied, what will happen to that topic? Will it be ignored?
If so, I think we should look for a cleaner solution.

9. A large concern raised in the discussion for KAFKA-3821 was the allowing
connectors to control the ordering of these special "offsets-only"
emissions and the regular source records returned from SourceTask::poll.
Are we choosing to ignore that concern? If so, can you add this to the
rejected alternatives section along with a rationale?

10. If, sometime in the future, we wanted to add framework-level support
for sending heartbeat records that doesn't require connectors to implement
any new APIs (e.g., SourceTask::produceHeartbeatRecords), a lot of this
would paint us into a corner design-wise. We'd have to think carefully
about which property names would be used, how to account for connectors
that have already implemented the SourceTask::produceHeartbeatRecords
method, etc. In general, it seems like we're trying to solve two completely
different problems with this single KIP: adding framework-level support for
emitting heartbeat records for source connectors, and allowing source
connectors to emit offsets without also emitting source records. I don't
mind addressing the two at the same time if the result is elegant and
doesn't compromise on the solution for either problem, but that doesn't
seem to be the case here. Of the two problems, could we describe one as the
primary and one as the secondary? If so, we might consider dropping the
secondary problm from this KIP and addressing it separately.

[1] - https://issues.apache.org/jira/browse/KAFKA-3821
[2] -
https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html

Cheers,

Chris

On Sat, Mar 25, 2023 at 11:18 PM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi John,
>
> Thanks for taking. look at the KIP!
>
> The point about stream time not advancing in case of infrequent updates is
> an interesting one. I can imagine if the upstream producer to a Kafka
> Streams application is a Source Connector which isn't sending records
> frequently(due to the nature of the data ingestion for example), then the
> downstream stream processing can land into the issues you described above.
>
> Which also brings me to the second point you made about how this would be
> used by downstream consumers. IIUC, you are referring to the consumers of
> the newly added topic i.e the heartbeat topic. In my mind, the heartbeat
> topic is an internal topic (similar to offsets/config/status topic in
> connect), the main purpose of which is to trick the framework to produce
> records to the offsets topic and advance the offsets. Since every connector
> could have a different definition of offsets(LSN, BinLogID etc for
> example), that logic to determine what the heartbeat records should be
> would have to reside in the actual connector.
>
> Now that I think of it, it could very well be consumed by downstream
> consumers/ Streams or Flink Applications and be further used for some
> decision making. A very crude example could be let's say if the heartbeat
> records sent to the new heartbeat topic include timestamps, then the
> downstream streams application can use that timestamp to close any time
> windows. Having said that, it still appears to me that it's outside the
> scope of the Connect framework and is something which is difficult to
> generalise because of the variety of Sources and the definitions of
> offsets.
>
> But, I would still be more than happy to add this example if you think it
> can be useful in getting a better understanding of the idea and also its
> utility beyond connect. Please let me know!
>
> Thanks!
> Sagar.
>
>
> On Fri, Mar 24, 2023 at 7:22 PM John Roesler <vvcep...@apache.org> wrote:
>
> > Thanks for the KIP, Sagar!
> >
> > At first glance, this seems like a very useful feature.
> >
> > A common pain point in Streams is when upstream producers don't send
> > regular updates and stream time cannot advance. This causes
> > stream-time-driven operations to appear to hang, like time windows not
> > closing, suppressions not firing, etc.
> >
> > From your KIP, I have a good idea of how the feature would be integrated
> > into connect, and it sounds good to me. I don't quite see how downstream
> > clients, such as a downstream Streams or Flink application, or users of
> the
> > Consumer would make use of this feature. Could you add some examples of
> > that nature?
> >
> > Thank you,
> > -John
> >
> > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
> > > Hi All,
> > >
> > > Bumping the thread again.
> > >
> > > Sagar.
> > >
> > >
> > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> Bumping this discussion thread again.
> > >>
> > >> Thanks!
> > >> Sagar.
> > >>
> > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> > >>
> > >>> Hi All,
> > >>>
> > >>> I wanted to create a discussion thread for KIP-910:
> > >>>
> > >>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
> > >>>
> > >>> Thanks!
> > >>> Sagar.
> > >>>
> > >>
> >
>

Reply via email to