Good point about size-based. I created a JIRA to track this feature: https://issues.apache.org/jira/browse/KAFKA-4161
On Tue, Sep 13, 2016 at 4:19 PM Dean Arnold <renodino...@gmail.com> wrote: > Yes, using the SinkTaskContext as a notification channel works as well, so > thats fine. > > While a config value might be useful, its probably not safe to assume that > a sink would always want the same number of msgs/records for each commit, > since the commit volume might be defined in bytes, e.g., accumulating > enough data to fill a 128 MB HDFS chunk could be any number of > msgs/records. > > In fact, this mechanism is really more general than just volume based > commits; its really about providing sinks a flexible commit capability > (e.g., some sink event requires premature commit, or otherwise requires > modification of the commit interval). > > On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <shik...@confluent.io> > wrote: > > > Hi Dean, > > > > I agree, it would be good to support volume-based offset commits. > > > > For giving more control on flushes to a sink connector, rather than > adding > > a new task.put() variant, I think it may be better to add an API like > > "requestFlush()" to the `SinkTaskContext` (and perhaps also > > `SinkTaskContext`). > > > > Another option could be to add something like "offset.flush.record.count" > > in addition to the existing "offset.flush.interval.ms". Both options > could > > be configured but whichever happens first would reset the other. > > > > What do you think? > > > > Best, > > > > Shikhar > > > > On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <renodino...@gmail.com> > wrote: > > > > > I have a need for volume based commits in a few sink connectors, and > the > > > current interval-only based commit strategy creates some headaches. > After > > > skimming the code, it appears that an alternate put() method that > > returned > > > a Map<TopicPartition, Long> might be used to allow a sink connector to > > keep > > > Kafka up to date wrt committed offsets in the sink system, so that > Kafka > > > might defer or reset its commit interval for topics/partitions (at > least, > > > for the consumer used for SinkTasks). It wouldn't replace interval > based > > > flush(), but hopefully flush() would be invoked much less frequently, > and > > > permit the flush interval to be increased, so the sink connector can > > better > > > optimize its commit batches. Eg, the sink could almost always commit > 5000 > > > records, rather than whatever happened to be buffered up when flush() > was > > > called, which might be very small or very large. > > > > > > I'm thinking of something like > > > > > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record) > > > > > > Put the records in the sink, and, if the operation results in > committing > > > records to the sink system, report the largest committed offset of each > > > committed topic/partition. Returns null if no commit occurs. > > > > > > I'm not certain how a rebalance might effect the processing; since a > sink > > > would still need to support existing interval based commits, the > > rebalance > > > (and any sink recovery) would presumably work the same. > > > > > > Am I overlooking any systemic issues that would preclude such a > feature ? > > > > > >