Robert Metzger created FLINK-3264:
-------------------------------------

             Summary: Add load shedding policy into Kafka Consumers
                 Key: FLINK-3264
                 URL: https://issues.apache.org/jira/browse/FLINK-3264
             Project: Flink
          Issue Type: Improvement
          Components: Kafka Connector
            Reporter: Robert Metzger


There are situations when Flink's Kafka Consumer is not able to consume 
everything produced into a topic, for example when one Flink instance is 
subscribed to a busy Kafka topic (See user request: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-exceptions-killing-streaming-job-td4323.html
 )

I think we should allow users to control the behavior of the Kafka consumer in 
those situations.

I had an offline discussion with [~StephanEwen] about this and we think that 
the allowing users to pass a LoadSheddingPolicy to the KafkaConsumer would be 
the best solution.
In the policy, users can define a frequency for the consumer to request the 
latest offsets in the subscribed partitions (the requests can either be based 
on time (every n ms) or on record count (every n'th record). Then, the policy 
can decide to skip a certain amount of offsets (maybe even set to the latest 
offset).
With the "offset skipping" approach, we'll avoid fetching records we can not 
process anyways.

In the 0.9 consumer, there doesn't seem to be an API for requesting the latest 
offset of a topicPartition. I'll ask on the Kafka ML whats the status there.
With {{seek()}} we can fetch from any offset.

In the 0.8 SimpleConsumer, there is a method for requesting the offsets:
{code}
kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
                        OffsetResponse response = 
consumer.getOffsetsBefore(request);
{code}
The fetch offset is controlled within the {{LegacyFetcher}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to