Thanks Matthias. I am facing the issue  when i am trying to call the resume
from the scheduled method .
Was getting exception that  Kafka Consumer is not safe for multi threaded
access . I am trying to see how can call pause and resume on the same
thread. There will be only one thread running for consumption.


On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> There is no issue if you call `poll()` is all partitions are paused. If
> fact, if you want to make sure that the consumer does not fall out of
> the consumer group, you must call `poll()` in regular interval to not
> hit `max.poll.interval.ms` timeout.
>
>
> -Matthias
>
> On 10/24/18 10:25 AM, pradeep s wrote:
> > Pause and resume is required since i am running a pod in kubernetes and i
> > am not shutting down the app
> >
> > On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sreekumar.prad...@gmail.com>
> > wrote:
> >
> >> Hi,
> >> I have a requirement to have kafka streaming start at scheduled time and
> >> then pause the stream when the consumer poll returns empty fetches for
> 3 or
> >> more polls.
> >>
> >> I am starting a consumer poll loop during application startup using a
> >> singled thread executor and then pausing the consumer when the poll is
> >> returning empty for 3 polls.
> >>
> >> When the schedule kicks in , i am calling *consumer.resume.*
> >>
> >> Is this approach correct ?
> >> Will it cause any issue If the  consumer calls poll on a paused
> consumer.
> >>
> >> Skeleton Code
> >> ============
> >>
> >> public class *OfferItemImageConsumer* implements Runnable {
> >>
> >> @Override
> >> public void run() {
> >>     try {
> >>         do {
> >>             ConsumerRecords<String, String> records =
> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> >>             writeAndPauseEmptyFetch(records);
> >>             processRecords(records);
> >>         } while (!consumerLoopClosed.get());
> >>     } catch (RuntimeException ex) {
> >>         handleConsumerLoopException(ex);
> >>     } finally {
> >>         kafkaConsumer.close();
> >>     }
> >> }
> >>
> >>
> >> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
> records) {
> >>     if (records.isEmpty()) {
> >>         emptyFetchCount++;
> >>     }
> >>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> >>         writeImageData();
> >>         emptyFetchCount = 0;
> >>         kafkaConsumer.pause(kafkaConsumer.assignment());
> >>         consumerPaused = true;
> >>     }
> >> }
> >>
> >> }
> >>
> >> =================================
> >>
> >> public class *ItemImageStreamScheduler* {
> >>     private static final int TERMINATION_TIMEOUT = 10;
> >>
> >>
> >>     private ExecutorService executorService =
> Executors.newSingleThreadExecutor();
> >>
> >>     private final OfferItemImageConsumer offerItemImageConsumer;
> >>     private final ItemImageStreamConfig itemImageStreamConfig;
> >>     private final KafkaConsumer<String, String> kafkaConsumer;
> >>
> >>     @EventListener(ApplicationReadyEvent.class)
> >>     void startStreaming() {
> >>         executorService.submit(offerItemImageConsumer);
> >>     }
> >>     @Scheduled
> >>     void resumeStreaming() {
> >>         kafkaConsumer.resume(kafkaConsumer.assignment());
> >>     }
> >>
> >>
> >> }
> >>
> >> Thanks
> >>
> >> Pradeep
> >>
> >>
> >
>
>

Reply via email to