Good point about size-based.

I created a JIRA to track this feature:
https://issues.apache.org/jira/browse/KAFKA-4161

On Tue, Sep 13, 2016 at 4:19 PM Dean Arnold <renodino...@gmail.com> wrote:

> Yes, using the SinkTaskContext as a notification channel works as well, so
> thats fine.
>
> While a config value might be useful, its probably not safe to assume that
> a sink would always want the same number of msgs/records for each commit,
> since the commit volume might be defined in bytes, e.g., accumulating
> enough data to fill a 128 MB HDFS chunk could be any number of
> msgs/records.
>
> In fact, this mechanism is really more general than just volume based
> commits; its really about providing sinks a flexible commit capability
> (e.g., some sink event requires premature commit, or otherwise requires
> modification of the commit interval).
>
> On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <shik...@confluent.io>
> wrote:
>
> > Hi Dean,
> >
> > I agree, it would be good to support volume-based offset commits.
> >
> > For giving more control on flushes to a sink connector, rather than
> adding
> > a new task.put() variant, I think it may be better to add an API like
> > "requestFlush()" to the `SinkTaskContext` (and perhaps also
> > `SinkTaskContext`).
> >
> > Another option could be to add something like "offset.flush.record.count"
> > in addition to the existing "offset.flush.interval.ms". Both options
> could
> > be configured but whichever happens first would reset the other.
> >
> > What do you think?
> >
> > Best,
> >
> > Shikhar
> >
> > On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <renodino...@gmail.com>
> wrote:
> >
> > > I have a need for volume based commits in a few sink connectors, and
> the
> > > current interval-only based commit strategy creates some headaches.
> After
> > > skimming the code, it appears that an alternate put() method that
> > returned
> > > a Map<TopicPartition, Long> might be used to allow a sink connector to
> > keep
> > > Kafka up to date wrt committed offsets in the sink system, so that
> Kafka
> > > might defer or reset its commit interval for topics/partitions (at
> least,
> > > for the consumer used for SinkTasks). It wouldn't replace interval
> based
> > > flush(), but hopefully flush() would be invoked much less frequently,
> and
> > > permit the flush interval to be increased, so the sink connector can
> > better
> > > optimize its commit batches. Eg, the sink could almost always commit
> 5000
> > > records, rather than whatever happened to be buffered up when flush()
> was
> > > called, which might be very small or very large.
> > >
> > > I'm thinking of something like
> > >
> > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record)
> > >
> > > Put the records in the sink, and, if the operation results in
> committing
> > > records to the sink system, report the largest committed offset of each
> > > committed topic/partition. Returns null if no commit occurs.
> > >
> > > I'm not certain how a rebalance might effect the processing; since a
> sink
> > > would still need to support existing interval based commits, the
> > rebalance
> > > (and any sink recovery) would presumably work the same.
> > >
> > > Am I overlooking any systemic issues that would preclude such a
> feature ?
> > >
> >
>

Reply via email to