One item to be aware with pause and resume - is that it applies to partitions currently assigned to the consumer.
But partitions can get revoked or additional partitions can get assigned to consumer. With reassigned , you might be expecting the consumer to be paused but suddenly start getting messages because a new partition got assigned. Use the RebalanceListener to pause or resume any new partitions regards On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <matth...@confluent.io> wrote: > That is correct: clients are not thread safe. > > You can use an `AtomicBoolean needToResume` that you share over both > threads and that is initially false. > > In your scheduled method, you set the variable to true. > > In your main consumer, each time before you call poll(), you check if > the variable is set to true. If yes, you resume() and reset the variable > to false. > > Hope this helps. > > -Matthias > > > On 10/25/18 2:09 PM, pradeep s wrote: > > Thanks Matthias. I am facing the issue when i am trying to call the > resume > > from the scheduled method . > > Was getting exception that Kafka Consumer is not safe for multi threaded > > access . I am trying to see how can call pause and resume on the same > > thread. There will be only one thread running for consumption. > > > > > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> There is no issue if you call `poll()` is all partitions are paused. If > >> fact, if you want to make sure that the consumer does not fall out of > >> the consumer group, you must call `poll()` in regular interval to not > >> hit `max.poll.interval.ms` timeout. > >> > >> > >> -Matthias > >> > >> On 10/24/18 10:25 AM, pradeep s wrote: > >>> Pause and resume is required since i am running a pod in kubernetes > and i > >>> am not shutting down the app > >>> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s < > sreekumar.prad...@gmail.com> > >>> wrote: > >>> > >>>> Hi, > >>>> I have a requirement to have kafka streaming start at scheduled time > and > >>>> then pause the stream when the consumer poll returns empty fetches for > >> 3 or > >>>> more polls. > >>>> > >>>> I am starting a consumer poll loop during application startup using a > >>>> singled thread executor and then pausing the consumer when the poll is > >>>> returning empty for 3 polls. > >>>> > >>>> When the schedule kicks in , i am calling *consumer.resume.* > >>>> > >>>> Is this approach correct ? > >>>> Will it cause any issue If the consumer calls poll on a paused > >> consumer. > >>>> > >>>> Skeleton Code > >>>> ============ > >>>> > >>>> public class *OfferItemImageConsumer* implements Runnable { > >>>> > >>>> @Override > >>>> public void run() { > >>>> try { > >>>> do { > >>>> ConsumerRecords<String, String> records = > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); > >>>> writeAndPauseEmptyFetch(records); > >>>> processRecords(records); > >>>> } while (!consumerLoopClosed.get()); > >>>> } catch (RuntimeException ex) { > >>>> handleConsumerLoopException(ex); > >>>> } finally { > >>>> kafkaConsumer.close(); > >>>> } > >>>> } > >>>> > >>>> > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> > >> records) { > >>>> if (records.isEmpty()) { > >>>> emptyFetchCount++; > >>>> } > >>>> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { > >>>> writeImageData(); > >>>> emptyFetchCount = 0; > >>>> kafkaConsumer.pause(kafkaConsumer.assignment()); > >>>> consumerPaused = true; > >>>> } > >>>> } > >>>> > >>>> } > >>>> > >>>> ================================= > >>>> > >>>> public class *ItemImageStreamScheduler* { > >>>> private static final int TERMINATION_TIMEOUT = 10; > >>>> > >>>> > >>>> private ExecutorService executorService = > >> Executors.newSingleThreadExecutor(); > >>>> > >>>> private final OfferItemImageConsumer offerItemImageConsumer; > >>>> private final ItemImageStreamConfig itemImageStreamConfig; > >>>> private final KafkaConsumer<String, String> kafkaConsumer; > >>>> > >>>> @EventListener(ApplicationReadyEvent.class) > >>>> void startStreaming() { > >>>> executorService.submit(offerItemImageConsumer); > >>>> } > >>>> @Scheduled > >>>> void resumeStreaming() { > >>>> kafkaConsumer.resume(kafkaConsumer.assignment()); > >>>> } > >>>> > >>>> > >>>> } > >>>> > >>>> Thanks > >>>> > >>>> Pradeep > >>>> > >>>> > >>> > >> > >> > > > > -- http://khangaonkar.blogspot.com/