Hi Manoj/Matthias,
My requirement is that to run the consumer daily once , stream the messages
and pause when i am encountering a few empty fetches .
I am planning to  run two consumers and  pausing the consumption based on
the empty fetches for a topic with 4 partitions .
To avoid the consumer multi thread access issue , i am running  consumer,
exit  the poll loop, and calling pause on the same thread. In this case , i
will not continuously polling .
When the next schedule kicks in , i will resume the polling .
Will the consumer resume call cause issues  ,since the schedule loop is
trigger long time after the polling stopped .(Or the old approach of
continuous polling is the correct one)
Also ,Manoj, can you please explain on the rebalance scenario if the
consumer is paused for two partitions and gets the assignment for another
two partitions (because of a pod termination), how can i pause the
consumption if its not the scheduled time to process the records.
Thanks
Pradeep

On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar <khangaon...@gmail.com>
wrote:

> One item to be aware with pause and resume - is that it applies to
> partitions currently assigned to the consumer.
>
> But partitions can get revoked or additional partitions can get assigned to
> consumer.
>
> With reassigned , you might be expecting the consumer to be paused but
> suddenly start getting messages because a new partition got assigned.
>
> Use the RebalanceListener to pause or resume any new partitions
>
> regards
>
> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > That is correct: clients are not thread safe.
> >
> > You can use an `AtomicBoolean needToResume` that you share over both
> > threads and that is initially false.
> >
> > In your scheduled method, you set the variable to true.
> >
> > In your main consumer, each time before you call poll(), you check if
> > the variable is set to true. If yes, you resume() and reset the variable
> > to false.
> >
> > Hope this helps.
> >
> > -Matthias
> >
> >
> > On 10/25/18 2:09 PM, pradeep s wrote:
> > > Thanks Matthias. I am facing the issue  when i am trying to call the
> > resume
> > > from the scheduled method .
> > > Was getting exception that  Kafka Consumer is not safe for multi
> threaded
> > > access . I am trying to see how can call pause and resume on the same
> > > thread. There will be only one thread running for consumption.
> > >
> > >
> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > >> There is no issue if you call `poll()` is all partitions are paused.
> If
> > >> fact, if you want to make sure that the consumer does not fall out of
> > >> the consumer group, you must call `poll()` in regular interval to not
> > >> hit `max.poll.interval.ms` timeout.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 10/24/18 10:25 AM, pradeep s wrote:
> > >>> Pause and resume is required since i am running a pod in kubernetes
> > and i
> > >>> am not shutting down the app
> > >>>
> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> > sreekumar.prad...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>> I have a requirement to have kafka streaming start at scheduled time
> > and
> > >>>> then pause the stream when the consumer poll returns empty fetches
> for
> > >> 3 or
> > >>>> more polls.
> > >>>>
> > >>>> I am starting a consumer poll loop during application startup using
> a
> > >>>> singled thread executor and then pausing the consumer when the poll
> is
> > >>>> returning empty for 3 polls.
> > >>>>
> > >>>> When the schedule kicks in , i am calling *consumer.resume.*
> > >>>>
> > >>>> Is this approach correct ?
> > >>>> Will it cause any issue If the  consumer calls poll on a paused
> > >> consumer.
> > >>>>
> > >>>> Skeleton Code
> > >>>> ============
> > >>>>
> > >>>> public class *OfferItemImageConsumer* implements Runnable {
> > >>>>
> > >>>> @Override
> > >>>> public void run() {
> > >>>>     try {
> > >>>>         do {
> > >>>>             ConsumerRecords<String, String> records =
> > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> > >>>>             writeAndPauseEmptyFetch(records);
> > >>>>             processRecords(records);
> > >>>>         } while (!consumerLoopClosed.get());
> > >>>>     } catch (RuntimeException ex) {
> > >>>>         handleConsumerLoopException(ex);
> > >>>>     } finally {
> > >>>>         kafkaConsumer.close();
> > >>>>     }
> > >>>> }
> > >>>>
> > >>>>
> > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
> > >> records) {
> > >>>>     if (records.isEmpty()) {
> > >>>>         emptyFetchCount++;
> > >>>>     }
> > >>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused)
> {
> > >>>>         writeImageData();
> > >>>>         emptyFetchCount = 0;
> > >>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
> > >>>>         consumerPaused = true;
> > >>>>     }
> > >>>> }
> > >>>>
> > >>>> }
> > >>>>
> > >>>> =================================
> > >>>>
> > >>>> public class *ItemImageStreamScheduler* {
> > >>>>     private static final int TERMINATION_TIMEOUT = 10;
> > >>>>
> > >>>>
> > >>>>     private ExecutorService executorService =
> > >> Executors.newSingleThreadExecutor();
> > >>>>
> > >>>>     private final OfferItemImageConsumer offerItemImageConsumer;
> > >>>>     private final ItemImageStreamConfig itemImageStreamConfig;
> > >>>>     private final KafkaConsumer<String, String> kafkaConsumer;
> > >>>>
> > >>>>     @EventListener(ApplicationReadyEvent.class)
> > >>>>     void startStreaming() {
> > >>>>         executorService.submit(offerItemImageConsumer);
> > >>>>     }
> > >>>>     @Scheduled
> > >>>>     void resumeStreaming() {
> > >>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
> > >>>>     }
> > >>>>
> > >>>>
> > >>>> }
> > >>>>
> > >>>> Thanks
> > >>>>
> > >>>> Pradeep
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
> --
> http://khangaonkar.blogspot.com/
>

Reply via email to