Hi Dean, I agree, it would be good to support volume-based offset commits.
For giving more control on flushes to a sink connector, rather than adding a new task.put() variant, I think it may be better to add an API like "requestFlush()" to the `SinkTaskContext` (and perhaps also `SinkTaskContext`). Another option could be to add something like "offset.flush.record.count" in addition to the existing "offset.flush.interval.ms". Both options could be configured but whichever happens first would reset the other. What do you think? Best, Shikhar On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <renodino...@gmail.com> wrote: > I have a need for volume based commits in a few sink connectors, and the > current interval-only based commit strategy creates some headaches. After > skimming the code, it appears that an alternate put() method that returned > a Map<TopicPartition, Long> might be used to allow a sink connector to keep > Kafka up to date wrt committed offsets in the sink system, so that Kafka > might defer or reset its commit interval for topics/partitions (at least, > for the consumer used for SinkTasks). It wouldn't replace interval based > flush(), but hopefully flush() would be invoked much less frequently, and > permit the flush interval to be increased, so the sink connector can better > optimize its commit batches. Eg, the sink could almost always commit 5000 > records, rather than whatever happened to be buffered up when flush() was > called, which might be very small or very large. > > I'm thinking of something like > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record) > > Put the records in the sink, and, if the operation results in committing > records to the sink system, report the largest committed offset of each > committed topic/partition. Returns null if no commit occurs. > > I'm not certain how a rebalance might effect the processing; since a sink > would still need to support existing interval based commits, the rebalance > (and any sink recovery) would presumably work the same. > > Am I overlooking any systemic issues that would preclude such a feature ? >