That is correct: clients are not thread safe.

You can use an `AtomicBoolean needToResume` that you share over both
threads and that is initially false.

In your scheduled method, you set the variable to true.

In your main consumer, each time before you call poll(), you check if
the variable is set to true. If yes, you resume() and reset the variable
to false.

Hope this helps.

-Matthias


On 10/25/18 2:09 PM, pradeep s wrote:
> Thanks Matthias. I am facing the issue  when i am trying to call the resume
> from the scheduled method .
> Was getting exception that  Kafka Consumer is not safe for multi threaded
> access . I am trying to see how can call pause and resume on the same
> thread. There will be only one thread running for consumption.
> 
> 
> On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> There is no issue if you call `poll()` is all partitions are paused. If
>> fact, if you want to make sure that the consumer does not fall out of
>> the consumer group, you must call `poll()` in regular interval to not
>> hit `max.poll.interval.ms` timeout.
>>
>>
>> -Matthias
>>
>> On 10/24/18 10:25 AM, pradeep s wrote:
>>> Pause and resume is required since i am running a pod in kubernetes and i
>>> am not shutting down the app
>>>
>>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sreekumar.prad...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I have a requirement to have kafka streaming start at scheduled time and
>>>> then pause the stream when the consumer poll returns empty fetches for
>> 3 or
>>>> more polls.
>>>>
>>>> I am starting a consumer poll loop during application startup using a
>>>> singled thread executor and then pausing the consumer when the poll is
>>>> returning empty for 3 polls.
>>>>
>>>> When the schedule kicks in , i am calling *consumer.resume.*
>>>>
>>>> Is this approach correct ?
>>>> Will it cause any issue If the  consumer calls poll on a paused
>> consumer.
>>>>
>>>> Skeleton Code
>>>> ============
>>>>
>>>> public class *OfferItemImageConsumer* implements Runnable {
>>>>
>>>> @Override
>>>> public void run() {
>>>>     try {
>>>>         do {
>>>>             ConsumerRecords<String, String> records =
>> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>>>>             writeAndPauseEmptyFetch(records);
>>>>             processRecords(records);
>>>>         } while (!consumerLoopClosed.get());
>>>>     } catch (RuntimeException ex) {
>>>>         handleConsumerLoopException(ex);
>>>>     } finally {
>>>>         kafkaConsumer.close();
>>>>     }
>>>> }
>>>>
>>>>
>>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
>> records) {
>>>>     if (records.isEmpty()) {
>>>>         emptyFetchCount++;
>>>>     }
>>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
>>>>         writeImageData();
>>>>         emptyFetchCount = 0;
>>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
>>>>         consumerPaused = true;
>>>>     }
>>>> }
>>>>
>>>> }
>>>>
>>>> =================================
>>>>
>>>> public class *ItemImageStreamScheduler* {
>>>>     private static final int TERMINATION_TIMEOUT = 10;
>>>>
>>>>
>>>>     private ExecutorService executorService =
>> Executors.newSingleThreadExecutor();
>>>>
>>>>     private final OfferItemImageConsumer offerItemImageConsumer;
>>>>     private final ItemImageStreamConfig itemImageStreamConfig;
>>>>     private final KafkaConsumer<String, String> kafkaConsumer;
>>>>
>>>>     @EventListener(ApplicationReadyEvent.class)
>>>>     void startStreaming() {
>>>>         executorService.submit(offerItemImageConsumer);
>>>>     }
>>>>     @Scheduled
>>>>     void resumeStreaming() {
>>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
>>>>     }
>>>>
>>>>
>>>> }
>>>>
>>>> Thanks
>>>>
>>>> Pradeep
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to