Great, thanks.

On Tue, Sep 13, 2016 at 5:35 PM, Shikhar Bhushan <shik...@confluent.io>
wrote:

> Good point about size-based.
>
> I created a JIRA to track this feature:
> https://issues.apache.org/jira/browse/KAFKA-4161
>
> On Tue, Sep 13, 2016 at 4:19 PM Dean Arnold <renodino...@gmail.com> wrote:
>
> > Yes, using the SinkTaskContext as a notification channel works as well,
> so
> > thats fine.
> >
> > While a config value might be useful, its probably not safe to assume
> that
> > a sink would always want the same number of msgs/records for each commit,
> > since the commit volume might be defined in bytes, e.g., accumulating
> > enough data to fill a 128 MB HDFS chunk could be any number of
> > msgs/records.
> >
> > In fact, this mechanism is really more general than just volume based
> > commits; its really about providing sinks a flexible commit capability
> > (e.g., some sink event requires premature commit, or otherwise requires
> > modification of the commit interval).
> >
> > On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <shik...@confluent.io>
> > wrote:
> >
> > > Hi Dean,
> > >
> > > I agree, it would be good to support volume-based offset commits.
> > >
> > > For giving more control on flushes to a sink connector, rather than
> > adding
> > > a new task.put() variant, I think it may be better to add an API like
> > > "requestFlush()" to the `SinkTaskContext` (and perhaps also
> > > `SinkTaskContext`).
> > >
> > > Another option could be to add something like
> "offset.flush.record.count"
> > > in addition to the existing "offset.flush.interval.ms". Both options
> > could
> > > be configured but whichever happens first would reset the other.
> > >
> > > What do you think?
> > >
> > > Best,
> > >
> > > Shikhar
> > >
> > > On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <renodino...@gmail.com>
> > wrote:
> > >
> > > > I have a need for volume based commits in a few sink connectors, and
> > the
> > > > current interval-only based commit strategy creates some headaches.
> > After
> > > > skimming the code, it appears that an alternate put() method that
> > > returned
> > > > a Map<TopicPartition, Long> might be used to allow a sink connector
> to
> > > keep
> > > > Kafka up to date wrt committed offsets in the sink system, so that
> > Kafka
> > > > might defer or reset its commit interval for topics/partitions (at
> > least,
> > > > for the consumer used for SinkTasks). It wouldn't replace interval
> > based
> > > > flush(), but hopefully flush() would be invoked much less frequently,
> > and
> > > > permit the flush interval to be increased, so the sink connector can
> > > better
> > > > optimize its commit batches. Eg, the sink could almost always commit
> > 5000
> > > > records, rather than whatever happened to be buffered up when flush()
> > was
> > > > called, which might be very small or very large.
> > > >
> > > > I'm thinking of something like
> > > >
> > > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord>
> record)
> > > >
> > > > Put the records in the sink, and, if the operation results in
> > committing
> > > > records to the sink system, report the largest committed offset of
> each
> > > > committed topic/partition. Returns null if no commit occurs.
> > > >
> > > > I'm not certain how a rebalance might effect the processing; since a
> > sink
> > > > would still need to support existing interval based commits, the
> > > rebalance
> > > > (and any sink recovery) would presumably work the same.
> > > >
> > > > Am I overlooking any systemic issues that would preclude such a
> > feature ?
> > > >
> > >
> >
>

Reply via email to