ryucc commented on PR #1674:
URL: https://github.com/apache/samza/pull/1674#issuecomment-1603384515

   Then in this case, new `SystemConsumer`s shouldn't be created on 
KafkaCheckpointManager.start().
   
   Observing the logs in another way, new `SystemConsumer`s are started, but 
never used to `readCheckpoints`. They are also never closed, and we see them 
accumulating in the heap.
   
   This opens a 3rd solution.
   ```
     override def start(): Unit = {
        ...
        if(!systemConsumerStarted) {
            systemConsumer.start()
            systemConsumerStarted=true
        }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to