Hi Akash, Thanks for your question. When you say:
> kafkaconsumer misses some of the records written into the kafka source topic. Do you mean that there are records in the source topic that are never returned by Consumer#poll(), SourceTask#poll(), or written to the target topic by KafkaConnect? That is the behavior I would expect if you are starting up the consumer for the first time, or after not committing offsets, and auto.offset.reset=latest triggers [1]. You could turn on logging for the SubscriptionState, OffsetFetcher, or ConsumerCoordinator to watch what the Consumer is doing internally. If you want it to start at the beginning of the topic and read every record, you need to use auto.offset.reset=earliest. If you want the consumer to remember its progress and not re-process data, you will need to have the consumer seek() to particular offsets, or use commitSync() or commitAsync() [2]. Also what you're doing sounds similar to the existing MirrorMaker2 tool [3], you can consider using it, or learning from how it is implemented. Thanks, Greg [1] https://kafka.apache.org/documentation.html#consumerconfigs_auto.offset.reset [2] https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html [3] https://kafka.apache.org/documentation.html#georeplication On Wed, Jun 12, 2024 at 2:08 PM Akash Dhiman <akashdhiman...@gmail.com> wrote: > Hello, > we have a usecase where we use kafkaConsumer in a SourceTask of a source > connector to poll messages from an aws msk. > > if we try to produce data into the source topic immediately after the > connector gets into the running state we sometimes notice that > kafkaconsumer misses some of the records written into the kafka source > topic. (note that sourceTask#start involves subscribing to the topic and > sourceTask#poll involves the acutal kafkaConsumer.poll) call. > > i hypothesised that this might be due to kafka Consumer taking time to find > the offset for the topic and given that we have the auto.offset.reset > config set to latest this is the reason why it's happening, but I am unsure > on what observability i can use to confirm this (I have set up the log > level to error). but can it happen that the kafka connector is in running > state but it's polling method which basically uses kaflaConsumer.poll() is > still awaiting offset allocation? is there a way to verify this ina. more > efficient manner? >