scwhittle commented on code in PR #32162:
URL: https://github.com/apache/beam/pull/32162#discussion_r1793240554


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -569,22 +571,28 @@ private void consumerPollLoop() {
         try {
           if (records.isEmpty()) {
             records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
-          } else if (availableRecordsQueue.offer(
-              records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), 
TimeUnit.MILLISECONDS)) {
+            atleastOnePollCompleted.set(true);

Review Comment:
   I think remove this here and only set once it's been offered successfully 
(already done below).  That avoids race where other thread sees this true but 
the item is not yet in the queue.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -77,6 +77,8 @@
  */
 class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
 
+  AtomicBoolean atleastOnePollCompleted = new AtomicBoolean();

Review Comment:
   private final



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -621,19 +629,24 @@ void finalizeCheckpointMarkAsync(KafkaCheckpointMark 
checkpointMark) {
     checkpointMarkCommitsEnqueued.inc();
   }
 
+  // Ensure atleast one consumer poll has completed since this was last called.
   private void nextBatch() throws IOException {
     curBatch = Collections.emptyIterator();
-
     ConsumerRecords<byte[], byte[]> records;
-    try {
-      // poll available records, wait (if necessary) up to the specified 
timeout.
-      records =
-          availableRecordsQueue.poll(recordsDequeuePollTimeout.getMillis(), 
TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.warn("{}: Unexpected", this, e);
-      return;
-    }
+    do {
+      // try until background poll has completed atleast once
+      try {
+        // poll available records, wait (if necessary) up to the specified 
timeout.
+        records =
+            availableRecordsQueue.poll(
+                recordsDequeuePollTimeout.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("{}: Unexpected", this, e);
+        return;
+      }
+
+    } while (!atleastOnePollCompleted.get());
 

Review Comment:
   can't you just set atLeastOnePollCompleted to false here as a single spot?  
Resetting it for the interrupted exception doesn't seem necessary



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -77,6 +77,8 @@
  */
 class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
 
+  AtomicBoolean atleastOnePollCompleted = new AtomicBoolean();

Review Comment:
   nit: capitalize the L in least



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -569,22 +571,28 @@ private void consumerPollLoop() {
         try {
           if (records.isEmpty()) {
             records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
-          } else if (availableRecordsQueue.offer(
-              records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), 
TimeUnit.MILLISECONDS)) {
+            atleastOnePollCompleted.set(true);
+          }
+          if (!records.isEmpty()
+              && availableRecordsQueue.offer(
+                  records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), 
TimeUnit.MILLISECONDS)) {
             records = ConsumerRecords.empty();
+            atleastOnePollCompleted.set(true);
           }
-
           commitCheckpointMark();
         } catch (InterruptedException e) {
+          atleastOnePollCompleted.set(true);

Review Comment:
   should we instead be throwing or setting consumerPollException in these 
cases?
   I'm a little worried that if we set true once here, that it will then be set 
false when batch is attempted to be read, and then never gets set to true again 
and the other thread will block forever.
   
   Instead of the poll completed bit here it seems better to instead note the 
error state and then have the other thread check for that error state in 
addition to the atLeastOnePollCompleted and not loop in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to