I have a need for volume based commits in a few sink connectors, and the
current interval-only based commit strategy creates some headaches. After
skimming the code, it appears that an alternate put() method that returned
a Map<TopicPartition, Long> might be used to allow a sink connector to keep
Kafka up to date wrt committed offsets in the sink system, so that Kafka
might defer or reset its commit interval for topics/partitions (at least,
for the consumer used for SinkTasks). It wouldn't replace interval based
flush(), but hopefully flush() would be invoked much less frequently, and
permit the flush interval to be increased, so the sink connector can better
optimize its commit batches. Eg, the sink could almost always commit 5000
records, rather than whatever happened to be buffered up when flush() was
called, which might be very small or very large.

I'm thinking of something like

Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record)

Put the records in the sink, and, if the operation results in committing
records to the sink system, report the largest committed offset of each
committed topic/partition. Returns null if no commit occurs.

I'm not certain how a rebalance might effect the processing; since a sink
would still need to support existing interval based commits, the rebalance
(and any sink recovery) would presumably work the same.

Am I overlooking any systemic issues that would preclude such a feature ?

Reply via email to