Great, thanks. On Tue, Sep 13, 2016 at 5:35 PM, Shikhar Bhushan <shik...@confluent.io> wrote:
> Good point about size-based. > > I created a JIRA to track this feature: > https://issues.apache.org/jira/browse/KAFKA-4161 > > On Tue, Sep 13, 2016 at 4:19 PM Dean Arnold <renodino...@gmail.com> wrote: > > > Yes, using the SinkTaskContext as a notification channel works as well, > so > > thats fine. > > > > While a config value might be useful, its probably not safe to assume > that > > a sink would always want the same number of msgs/records for each commit, > > since the commit volume might be defined in bytes, e.g., accumulating > > enough data to fill a 128 MB HDFS chunk could be any number of > > msgs/records. > > > > In fact, this mechanism is really more general than just volume based > > commits; its really about providing sinks a flexible commit capability > > (e.g., some sink event requires premature commit, or otherwise requires > > modification of the commit interval). > > > > On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <shik...@confluent.io> > > wrote: > > > > > Hi Dean, > > > > > > I agree, it would be good to support volume-based offset commits. > > > > > > For giving more control on flushes to a sink connector, rather than > > adding > > > a new task.put() variant, I think it may be better to add an API like > > > "requestFlush()" to the `SinkTaskContext` (and perhaps also > > > `SinkTaskContext`). > > > > > > Another option could be to add something like > "offset.flush.record.count" > > > in addition to the existing "offset.flush.interval.ms". Both options > > could > > > be configured but whichever happens first would reset the other. > > > > > > What do you think? > > > > > > Best, > > > > > > Shikhar > > > > > > On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <renodino...@gmail.com> > > wrote: > > > > > > > I have a need for volume based commits in a few sink connectors, and > > the > > > > current interval-only based commit strategy creates some headaches. > > After > > > > skimming the code, it appears that an alternate put() method that > > > returned > > > > a Map<TopicPartition, Long> might be used to allow a sink connector > to > > > keep > > > > Kafka up to date wrt committed offsets in the sink system, so that > > Kafka > > > > might defer or reset its commit interval for topics/partitions (at > > least, > > > > for the consumer used for SinkTasks). It wouldn't replace interval > > based > > > > flush(), but hopefully flush() would be invoked much less frequently, > > and > > > > permit the flush interval to be increased, so the sink connector can > > > better > > > > optimize its commit batches. Eg, the sink could almost always commit > > 5000 > > > > records, rather than whatever happened to be buffered up when flush() > > was > > > > called, which might be very small or very large. > > > > > > > > I'm thinking of something like > > > > > > > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> > record) > > > > > > > > Put the records in the sink, and, if the operation results in > > committing > > > > records to the sink system, report the largest committed offset of > each > > > > committed topic/partition. Returns null if no commit occurs. > > > > > > > > I'm not certain how a rebalance might effect the processing; since a > > sink > > > > would still need to support existing interval based commits, the > > > rebalance > > > > (and any sink recovery) would presumably work the same. > > > > > > > > Am I overlooking any systemic issues that would preclude such a > > feature ? > > > > > > > > > >