I have a need for volume based commits in a few sink connectors, and the current interval-only based commit strategy creates some headaches. After skimming the code, it appears that an alternate put() method that returned a Map<TopicPartition, Long> might be used to allow a sink connector to keep Kafka up to date wrt committed offsets in the sink system, so that Kafka might defer or reset its commit interval for topics/partitions (at least, for the consumer used for SinkTasks). It wouldn't replace interval based flush(), but hopefully flush() would be invoked much less frequently, and permit the flush interval to be increased, so the sink connector can better optimize its commit batches. Eg, the sink could almost always commit 5000 records, rather than whatever happened to be buffered up when flush() was called, which might be very small or very large.
I'm thinking of something like Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record) Put the records in the sink, and, if the operation results in committing records to the sink system, report the largest committed offset of each committed topic/partition. Returns null if no commit occurs. I'm not certain how a rebalance might effect the processing; since a sink would still need to support existing interval based commits, the rebalance (and any sink recovery) would presumably work the same. Am I overlooking any systemic issues that would preclude such a feature ?