Robert Metzger created FLINK-3264:
-------------------------------------
Summary: Add load shedding policy into Kafka Consumers
Key: FLINK-3264
URL: https://issues.apache.org/jira/browse/FLINK-3264
Project: Flink
Issue Type: Improvement
Components: Kafka Connector
Reporter: Robert Metzger
There are situations when Flink's Kafka Consumer is not able to consume
everything produced into a topic, for example when one Flink instance is
subscribed to a busy Kafka topic (See user request:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-exceptions-killing-streaming-job-td4323.html
)
I think we should allow users to control the behavior of the Kafka consumer in
those situations.
I had an offline discussion with [~StephanEwen] about this and we think that
the allowing users to pass a LoadSheddingPolicy to the KafkaConsumer would be
the best solution.
In the policy, users can define a frequency for the consumer to request the
latest offsets in the subscribed partitions (the requests can either be based
on time (every n ms) or on record count (every n'th record). Then, the policy
can decide to skip a certain amount of offsets (maybe even set to the latest
offset).
With the "offset skipping" approach, we'll avoid fetching records we can not
process anyways.
In the 0.9 consumer, there doesn't seem to be an API for requesting the latest
offset of a topicPartition. I'll ask on the Kafka ML whats the status there.
With {{seek()}} we can fetch from any offset.
In the 0.8 SimpleConsumer, there is a method for requesting the offsets:
{code}
kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response =
consumer.getOffsetsBefore(request);
{code}
The fetch offset is controlled within the {{LegacyFetcher}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)