gemini-code-assist[bot] commented on code in PR #38361:
URL: https://github.com/apache/beam/pull/38361#discussion_r3174962832


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2130,12 +2130,16 @@ public void 
processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
             } else {
               for (String topic : topics) {
                 List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(topic);
-                if (partitionInfoList == null || partitionInfoList.isEmpty()) {
+                if (logTopicVerification == null || !logTopicVerification) {
+                  checkState(
+                      partitionInfoList != null && 
!partitionInfoList.isEmpty(),
+                      "Could not find any partitions info for topic %s. Please 
check Kafka configuration and make sure that provided topics exist.",
+                      topic);
+                } else {
                   LOG.warn(
-                      "Could not find any partitions info for topic {}. Please 
check Kafka "
-                          + "configuration and make sure that the provided 
topics exist.",
+                      "Could not find any partitions info for topic {}. Please 
check Kafka configuration "
+                          + "and make sure that the provided topics exist.",
                       topic);
-                  continue;
                 }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The logic in this revert is incorrect. If `logTopicVerification` is true, 
the `else` block executes for every topic, logging a warning even when 
`partitionInfoList` is valid. Furthermore, if `partitionInfoList` is null, the 
code will proceed to the loop at line 2145 and throw a `NullPointerException` 
because the `continue` statement was removed. The warning and skip logic should 
only trigger if `partitionInfoList` is actually null or empty.
   
   ```suggestion
                   if (logTopicVerification == null || !logTopicVerification) {
                     checkState(
                         partitionInfoList != null && 
!partitionInfoList.isEmpty(),
                         "Could not find any partitions info for topic %s. 
Please check Kafka configuration and make sure that provided topics exist.",
                         topic);
                   } else if (partitionInfoList == null || 
partitionInfoList.isEmpty()) {
                     LOG.warn(
                         "Could not find any partitions info for topic {}. 
Please check Kafka configuration "
                             + "and make sure that the provided topics exist.",
                         topic);
                     continue;
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to