xiaoqingwanga commented on code in PR #16303: URL: https://github.com/apache/kafka/pull/16303#discussion_r1639075718
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -115,40 +118,54 @@ public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataE this.pollTimeoutMs = pollTimeoutMs; this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs; this.time = Objects.requireNonNull(time); + this.isInternalConsumerClosed = new AtomicBoolean(false); this.uninitializedAt = time.milliseconds(); } @Override public void run() { log.info("Starting consumer task thread."); while (!isClosed) { - try { - if (hasAssignmentChanged) { - maybeWaitForPartitionAssignments(); - } + ingestRecords(); + } + closeConsumer(); + log.info("Exited from consumer task thread"); + } - log.trace("Polling consumer to receive remote log metadata topic records"); - final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); - for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { - processConsumerRecord(record); - } - maybeMarkUserPartitionsAsReady(); - } catch (final WakeupException ex) { - // ignore logging the error - isClosed = true; - } catch (final RetriableException ex) { - log.warn("Retriable error occurred while processing the records. Retrying...", ex); - } catch (final Exception ex) { - isClosed = true; - log.error("Error occurred while processing the records", ex); + // public for testing + public void ingestRecords() { + try { + if (hasAssignmentChanged) { + maybeWaitForPartitionAssignments(); } + + log.trace("Polling consumer to receive remote log metadata topic records"); + final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { + processConsumerRecord(record); + } + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + closeConsumer(); + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { + isClosed = true; + log.error("Error occurred while processing the records", ex); + closeConsumer(); } - try { - consumer.close(Duration.ofSeconds(30)); - } catch (final Exception e) { - log.error("Error encountered while closing the consumer", e); + } + + private void closeConsumer() { + if (isInternalConsumerClosed.compareAndSet(false, true)) { Review Comment: The ConsumerTask is an important component, and making fewer changes is safer. If ingestRecords does not have the ability to close the consumer, it may seem a bit incomplete, but after all, it's an internal method. I think keeping it simple is indeed better👍. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org