xiaoqingwanga commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1639075718


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -115,40 +118,54 @@ public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataE
         this.pollTimeoutMs = pollTimeoutMs;
         this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
         this.time = Objects.requireNonNull(time);
+        this.isInternalConsumerClosed = new AtomicBoolean(false);
         this.uninitializedAt = time.milliseconds();
     }
 
     @Override
     public void run() {
         log.info("Starting consumer task thread.");
         while (!isClosed) {
-            try {
-                if (hasAssignmentChanged) {
-                    maybeWaitForPartitionAssignments();
-                }
+            ingestRecords();
+        }
+        closeConsumer();
+        log.info("Exited from consumer task thread");
+    }
 
-                log.trace("Polling consumer to receive remote log metadata 
topic records");
-                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-                    processConsumerRecord(record);
-                }
-                maybeMarkUserPartitionsAsReady();
-            } catch (final WakeupException ex) {
-                // ignore logging the error
-                isClosed = true;
-            } catch (final RetriableException ex) {
-                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
-            } catch (final Exception ex) {
-                isClosed = true;
-                log.error("Error occurred while processing the records", ex);
+    // public for testing
+    public void ingestRecords() {
+        try {
+            if (hasAssignmentChanged) {
+                maybeWaitForPartitionAssignments();
             }
+
+            log.trace("Polling consumer to receive remote log metadata topic 
records");
+            final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+            for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                processConsumerRecord(record);
+            }
+            maybeMarkUserPartitionsAsReady();
+        } catch (final WakeupException ex) {
+            // ignore logging the error
+            isClosed = true;
+            closeConsumer();
+        } catch (final RetriableException ex) {
+            log.warn("Retriable error occurred while processing the records. 
Retrying...", ex);
+        } catch (final Exception ex) {
+            isClosed = true;
+            log.error("Error occurred while processing the records", ex);
+            closeConsumer();
         }
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (final Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+    }
+
+    private void closeConsumer() {
+        if (isInternalConsumerClosed.compareAndSet(false, true)) {

Review Comment:
   The ConsumerTask is an important component, and making fewer changes is 
safer. If ingestRecords does not have the ability to close the consumer, it may 
seem a bit incomplete, but after all, it's an internal method. 
   I think keeping it simple is indeed better👍.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to