Thanks Matthias. I am facing the issue when i am trying to call the resume from the scheduled method . Was getting exception that Kafka Consumer is not safe for multi threaded access . I am trying to see how can call pause and resume on the same thread. There will be only one thread running for consumption.
On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <matth...@confluent.io> wrote: > There is no issue if you call `poll()` is all partitions are paused. If > fact, if you want to make sure that the consumer does not fall out of > the consumer group, you must call `poll()` in regular interval to not > hit `max.poll.interval.ms` timeout. > > > -Matthias > > On 10/24/18 10:25 AM, pradeep s wrote: > > Pause and resume is required since i am running a pod in kubernetes and i > > am not shutting down the app > > > > On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sreekumar.prad...@gmail.com> > > wrote: > > > >> Hi, > >> I have a requirement to have kafka streaming start at scheduled time and > >> then pause the stream when the consumer poll returns empty fetches for > 3 or > >> more polls. > >> > >> I am starting a consumer poll loop during application startup using a > >> singled thread executor and then pausing the consumer when the poll is > >> returning empty for 3 polls. > >> > >> When the schedule kicks in , i am calling *consumer.resume.* > >> > >> Is this approach correct ? > >> Will it cause any issue If the consumer calls poll on a paused > consumer. > >> > >> Skeleton Code > >> ============ > >> > >> public class *OfferItemImageConsumer* implements Runnable { > >> > >> @Override > >> public void run() { > >> try { > >> do { > >> ConsumerRecords<String, String> records = > kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); > >> writeAndPauseEmptyFetch(records); > >> processRecords(records); > >> } while (!consumerLoopClosed.get()); > >> } catch (RuntimeException ex) { > >> handleConsumerLoopException(ex); > >> } finally { > >> kafkaConsumer.close(); > >> } > >> } > >> > >> > >> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> > records) { > >> if (records.isEmpty()) { > >> emptyFetchCount++; > >> } > >> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { > >> writeImageData(); > >> emptyFetchCount = 0; > >> kafkaConsumer.pause(kafkaConsumer.assignment()); > >> consumerPaused = true; > >> } > >> } > >> > >> } > >> > >> ================================= > >> > >> public class *ItemImageStreamScheduler* { > >> private static final int TERMINATION_TIMEOUT = 10; > >> > >> > >> private ExecutorService executorService = > Executors.newSingleThreadExecutor(); > >> > >> private final OfferItemImageConsumer offerItemImageConsumer; > >> private final ItemImageStreamConfig itemImageStreamConfig; > >> private final KafkaConsumer<String, String> kafkaConsumer; > >> > >> @EventListener(ApplicationReadyEvent.class) > >> void startStreaming() { > >> executorService.submit(offerItemImageConsumer); > >> } > >> @Scheduled > >> void resumeStreaming() { > >> kafkaConsumer.resume(kafkaConsumer.assignment()); > >> } > >> > >> > >> } > >> > >> Thanks > >> > >> Pradeep > >> > >> > > > >