One item to be aware with pause and resume - is that it applies to
partitions currently assigned to the consumer.

But partitions can get revoked or additional partitions can get assigned to
consumer.

With reassigned , you might be expecting the consumer to be paused but
suddenly start getting messages because a new partition got assigned.

Use the RebalanceListener to pause or resume any new partitions

regards

On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> That is correct: clients are not thread safe.
>
> You can use an `AtomicBoolean needToResume` that you share over both
> threads and that is initially false.
>
> In your scheduled method, you set the variable to true.
>
> In your main consumer, each time before you call poll(), you check if
> the variable is set to true. If yes, you resume() and reset the variable
> to false.
>
> Hope this helps.
>
> -Matthias
>
>
> On 10/25/18 2:09 PM, pradeep s wrote:
> > Thanks Matthias. I am facing the issue  when i am trying to call the
> resume
> > from the scheduled method .
> > Was getting exception that  Kafka Consumer is not safe for multi threaded
> > access . I am trying to see how can call pause and resume on the same
> > thread. There will be only one thread running for consumption.
> >
> >
> > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> There is no issue if you call `poll()` is all partitions are paused. If
> >> fact, if you want to make sure that the consumer does not fall out of
> >> the consumer group, you must call `poll()` in regular interval to not
> >> hit `max.poll.interval.ms` timeout.
> >>
> >>
> >> -Matthias
> >>
> >> On 10/24/18 10:25 AM, pradeep s wrote:
> >>> Pause and resume is required since i am running a pod in kubernetes
> and i
> >>> am not shutting down the app
> >>>
> >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> sreekumar.prad...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>> I have a requirement to have kafka streaming start at scheduled time
> and
> >>>> then pause the stream when the consumer poll returns empty fetches for
> >> 3 or
> >>>> more polls.
> >>>>
> >>>> I am starting a consumer poll loop during application startup using a
> >>>> singled thread executor and then pausing the consumer when the poll is
> >>>> returning empty for 3 polls.
> >>>>
> >>>> When the schedule kicks in , i am calling *consumer.resume.*
> >>>>
> >>>> Is this approach correct ?
> >>>> Will it cause any issue If the  consumer calls poll on a paused
> >> consumer.
> >>>>
> >>>> Skeleton Code
> >>>> ============
> >>>>
> >>>> public class *OfferItemImageConsumer* implements Runnable {
> >>>>
> >>>> @Override
> >>>> public void run() {
> >>>>     try {
> >>>>         do {
> >>>>             ConsumerRecords<String, String> records =
> >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> >>>>             writeAndPauseEmptyFetch(records);
> >>>>             processRecords(records);
> >>>>         } while (!consumerLoopClosed.get());
> >>>>     } catch (RuntimeException ex) {
> >>>>         handleConsumerLoopException(ex);
> >>>>     } finally {
> >>>>         kafkaConsumer.close();
> >>>>     }
> >>>> }
> >>>>
> >>>>
> >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
> >> records) {
> >>>>     if (records.isEmpty()) {
> >>>>         emptyFetchCount++;
> >>>>     }
> >>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> >>>>         writeImageData();
> >>>>         emptyFetchCount = 0;
> >>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
> >>>>         consumerPaused = true;
> >>>>     }
> >>>> }
> >>>>
> >>>> }
> >>>>
> >>>> =================================
> >>>>
> >>>> public class *ItemImageStreamScheduler* {
> >>>>     private static final int TERMINATION_TIMEOUT = 10;
> >>>>
> >>>>
> >>>>     private ExecutorService executorService =
> >> Executors.newSingleThreadExecutor();
> >>>>
> >>>>     private final OfferItemImageConsumer offerItemImageConsumer;
> >>>>     private final ItemImageStreamConfig itemImageStreamConfig;
> >>>>     private final KafkaConsumer<String, String> kafkaConsumer;
> >>>>
> >>>>     @EventListener(ApplicationReadyEvent.class)
> >>>>     void startStreaming() {
> >>>>         executorService.submit(offerItemImageConsumer);
> >>>>     }
> >>>>     @Scheduled
> >>>>     void resumeStreaming() {
> >>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
> >>>>     }
> >>>>
> >>>>
> >>>> }
> >>>>
> >>>> Thanks
> >>>>
> >>>> Pradeep
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

-- 
http://khangaonkar.blogspot.com/

Reply via email to