gemini-code-assist[bot] commented on code in PR #38361:
URL: https://github.com/apache/beam/pull/38361#discussion_r3174962832
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2130,12 +2130,16 @@ public void
processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
} else {
for (String topic : topics) {
List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(topic);
- if (partitionInfoList == null || partitionInfoList.isEmpty()) {
+ if (logTopicVerification == null || !logTopicVerification) {
+ checkState(
+ partitionInfoList != null &&
!partitionInfoList.isEmpty(),
+ "Could not find any partitions info for topic %s. Please
check Kafka configuration and make sure that provided topics exist.",
+ topic);
+ } else {
LOG.warn(
- "Could not find any partitions info for topic {}. Please
check Kafka "
- + "configuration and make sure that the provided
topics exist.",
+ "Could not find any partitions info for topic {}. Please
check Kafka configuration "
+ + "and make sure that the provided topics exist.",
topic);
- continue;
}
Review Comment:

The logic in this revert is incorrect. If `logTopicVerification` is true,
the `else` block executes for every topic, logging a warning even when
`partitionInfoList` is valid. Furthermore, if `partitionInfoList` is null, the
code will proceed to the loop at line 2145 and throw a `NullPointerException`
because the `continue` statement was removed. The warning and skip logic should
only trigger if `partitionInfoList` is actually null or empty.
```suggestion
if (logTopicVerification == null || !logTopicVerification) {
checkState(
partitionInfoList != null &&
!partitionInfoList.isEmpty(),
"Could not find any partitions info for topic %s.
Please check Kafka configuration and make sure that provided topics exist.",
topic);
} else if (partitionInfoList == null ||
partitionInfoList.isEmpty()) {
LOG.warn(
"Could not find any partitions info for topic {}.
Please check Kafka configuration "
+ "and make sure that the provided topics exist.",
topic);
continue;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]