[
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15536415#comment-15536415
]
Dean Arnold commented on KAFKA-4161:
------------------------------------
I'm the requester for this feature. My particular need is that the sink system
needs to commit with a fixed number of records (or as a close as possible) for
optimal storage management/reads/analytics. But the same notion applies
elsewhere (e.g., optimizing compression for large-ish buffers, or filling an
HDFS chunk before writing it out, etc). THe feature also provides a mechanism
for sink systems to dynamically adjust the frequency of commits, independent of
the flush interval.
My primary concern is the contract of flush(): the sink connector must commit
whatever data has been accumulated, even if its just a few records - since
flush() implies that Kafka is free to commit/compact all accumulated records up
to the flush(). By letting the sink connector provide feedback on what it has
already committed, the flush interval can be made fairly large to minimize the
small buffer flushes that might occur, but Kafka can still commit/compact at
reasonable intervals based on the sink's reported committed offsets, and
possibly never (or only rarely) invoke explicit flush() on the sink, assuming
it resets the flush timer to start at the delivery time of the oldest
uncommitted offset - sort of a sliding flush window based on the sink's
reported commits.
> Decouple flush and offset commits
> ---------------------------------
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
> Reporter: Shikhar Bhushan
> Assignee: Ewen Cheslack-Postava
> Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume
> or size-based commits. E.g. a sink connector which is buffering in terms of
> number of records may want to request a flush when the buffer is full, or
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would
> allow for connectors to have flexible policies around flushes. This would be
> in addition to the time interval based flushes that are controlled with
> {{offset.flush.interval.ms}}, for which the clock should be reset when any
> kind of flush happens.
> We should probably also support requesting flushes via the
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind
> off the bat.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)