Hi Akash,

Thanks for your question. When you say:

> kafkaconsumer misses some of the records written into the kafka source
topic.

Do you mean that there are records in the source topic that are never
returned by Consumer#poll(), SourceTask#poll(), or written to the target
topic by KafkaConnect?

That is the behavior I would expect if you are starting up the consumer for
the first time, or after not committing offsets, and
auto.offset.reset=latest triggers [1]. You could turn on logging for the
SubscriptionState, OffsetFetcher, or ConsumerCoordinator to watch what the
Consumer is doing internally.
If you want it to start at the beginning of the topic and read every
record, you need to use auto.offset.reset=earliest. If you want the
consumer to remember its progress and not re-process data, you will need to
have the consumer seek() to particular offsets, or use commitSync() or
commitAsync() [2].

Also what you're doing sounds similar to the existing MirrorMaker2 tool
[3], you can consider using it, or learning from how it is implemented.

Thanks,
Greg

[1]
https://kafka.apache.org/documentation.html#consumerconfigs_auto.offset.reset
[2]
https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
[3] https://kafka.apache.org/documentation.html#georeplication

On Wed, Jun 12, 2024 at 2:08 PM Akash Dhiman <akashdhiman...@gmail.com>
wrote:

> Hello,
> we have a usecase where we use kafkaConsumer in a SourceTask of a source
> connector to poll messages from an aws msk.
>
> if we try to produce data into the source topic immediately after the
> connector gets into the running state we sometimes notice that
> kafkaconsumer misses some of the records written into the kafka source
> topic. (note that sourceTask#start involves subscribing to the topic and
> sourceTask#poll involves the acutal kafkaConsumer.poll) call.
>
> i hypothesised that this might be due to kafka Consumer taking time to find
> the offset for the topic and given that we have the auto.offset.reset
> config set to latest this is the reason why it's happening, but I am unsure
> on what observability i can use to confirm this (I have set up the log
> level to error). but can it happen that the kafka connector is in running
> state but it's polling method which basically uses kaflaConsumer.poll() is
> still awaiting offset allocation? is there a way to verify this ina. more
> efficient manner?
>

Reply via email to