pvillard31 commented on code in PR #10689: URL: https://github.com/apache/nifi/pull/10689#discussion_r2644334196
########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaConnectionVerifier.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service; + +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SKIPPED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; + +class KafkaConnectionVerifier { + static final String ADDRESSES_STEP = "Broker Addresses"; + static final String CONFIGURATION_STEP = "Broker Configuration"; + static final String BROKER_CONNECTION_STEP = "Broker Connection"; + static final String NODE_CONNECTION_STEP = "Node Connection"; + static final String CLUSTER_DESCRIPTION_STEP = "Cluster Description"; + static final String CLUSTER_CONNECTION_STEP = "Cluster Connection"; + static final String TOPIC_LISTING_STEP = "Topic Listing"; + + private static final int SOCKET_CONNECT_TIMEOUT = 2500; + private static final int VERIFY_TIMEOUT = 5000; + private static final TimeUnit VERIFY_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + /** + * Verify Connection to Kafka Brokers + * + * @param verificationLogger Logger specific to verification warnings and errors + * @param resolvedProperties Resolved properties from Controller Service implementation containing required values for communicating with Kafka Brokers + * + * @return Verification Results + */ + List<ConfigVerificationResult> verify(final ComponentLog verificationLogger, final Properties resolvedProperties) { + final List<ConfigVerificationResult> results = new ArrayList<>(); + + // Get validated Bootstrap Addresses before checking socket connections + final List<InetSocketAddress> bootstrapAddresses = getBootstrapAddresses(verificationLogger, resolvedProperties, results); + if (bootstrapAddresses.isEmpty()) { + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(CONFIGURATION_STEP) + .outcome(FAILED) + .explanation("Validated Bootstrap Servers not found") + .build() + ); + } else { + // Get connected Bootstrap Addresses before checking Topics + final List<InetSocketAddress> connectedAddresses = getConnectedAddresses(verificationLogger, results, bootstrapAddresses); + if (connectedAddresses.isEmpty()) { + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(BROKER_CONNECTION_STEP) + .outcome(FAILED) + .explanation("Connected Bootstrap Servers not found") + .build() + ); + } else { + try (final Admin admin = Admin.create(resolvedProperties)) { + final List<Node> clusterNodes = getClusterNodes(verificationLogger, admin, results); + if (clusterNodes.isEmpty()) { + verificationLogger.info("Cluster Nodes not found"); + } else { + verifyClusterNodes(verificationLogger, connectedAddresses, clusterNodes, results); + } + + // Verify Topics regardless of Cluster Node status because Describe Cluster is independent of listing Topics + verifyTopics(verificationLogger, admin, results); + } + } + } + + return results; + } + + private List<InetSocketAddress> getBootstrapAddresses(final ComponentLog verificationLogger, final Properties resolvedProperties, final List<ConfigVerificationResult> results) { + final List<InetSocketAddress> bootstrapAddresses = new ArrayList<>(); + + try { + final AdminClientConfig adminClientConfig = new AdminClientConfig(resolvedProperties); + final List<InetSocketAddress> validatedAddresses = ClientUtils.parseAndValidateAddresses(adminClientConfig); + bootstrapAddresses.addAll(validatedAddresses); + + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(ADDRESSES_STEP) + .outcome(SUCCESSFUL) + .explanation("Addresses Validated [%d]".formatted(validatedAddresses.size())) Review Comment: ```suggestion .explanation("Addresses validated [%d]".formatted(validatedAddresses.size())) ``` To be consistent with, for example, "Topics found" ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaConnectionVerifier.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service; + +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SKIPPED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; + +class KafkaConnectionVerifier { + static final String ADDRESSES_STEP = "Broker Addresses"; + static final String CONFIGURATION_STEP = "Broker Configuration"; + static final String BROKER_CONNECTION_STEP = "Broker Connection"; + static final String NODE_CONNECTION_STEP = "Node Connection"; + static final String CLUSTER_DESCRIPTION_STEP = "Cluster Description"; + static final String CLUSTER_CONNECTION_STEP = "Cluster Connection"; + static final String TOPIC_LISTING_STEP = "Topic Listing"; + + private static final int SOCKET_CONNECT_TIMEOUT = 2500; + private static final int VERIFY_TIMEOUT = 5000; + private static final TimeUnit VERIFY_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + /** + * Verify Connection to Kafka Brokers + * + * @param verificationLogger Logger specific to verification warnings and errors + * @param resolvedProperties Resolved properties from Controller Service implementation containing required values for communicating with Kafka Brokers + * + * @return Verification Results + */ + List<ConfigVerificationResult> verify(final ComponentLog verificationLogger, final Properties resolvedProperties) { + final List<ConfigVerificationResult> results = new ArrayList<>(); + + // Get validated Bootstrap Addresses before checking socket connections + final List<InetSocketAddress> bootstrapAddresses = getBootstrapAddresses(verificationLogger, resolvedProperties, results); + if (bootstrapAddresses.isEmpty()) { + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(CONFIGURATION_STEP) + .outcome(FAILED) + .explanation("Validated Bootstrap Servers not found") + .build() + ); + } else { + // Get connected Bootstrap Addresses before checking Topics + final List<InetSocketAddress> connectedAddresses = getConnectedAddresses(verificationLogger, results, bootstrapAddresses); + if (connectedAddresses.isEmpty()) { + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(BROKER_CONNECTION_STEP) + .outcome(FAILED) + .explanation("Connected Bootstrap Servers not found") + .build() + ); + } else { + try (final Admin admin = Admin.create(resolvedProperties)) { + final List<Node> clusterNodes = getClusterNodes(verificationLogger, admin, results); + if (clusterNodes.isEmpty()) { + verificationLogger.info("Cluster Nodes not found"); + } else { + verifyClusterNodes(verificationLogger, connectedAddresses, clusterNodes, results); + } + + // Verify Topics regardless of Cluster Node status because Describe Cluster is independent of listing Topics + verifyTopics(verificationLogger, admin, results); + } + } + } + + return results; + } + + private List<InetSocketAddress> getBootstrapAddresses(final ComponentLog verificationLogger, final Properties resolvedProperties, final List<ConfigVerificationResult> results) { + final List<InetSocketAddress> bootstrapAddresses = new ArrayList<>(); + + try { + final AdminClientConfig adminClientConfig = new AdminClientConfig(resolvedProperties); + final List<InetSocketAddress> validatedAddresses = ClientUtils.parseAndValidateAddresses(adminClientConfig); + bootstrapAddresses.addAll(validatedAddresses); + + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(ADDRESSES_STEP) + .outcome(SUCCESSFUL) + .explanation("Addresses Validated [%d]".formatted(validatedAddresses.size())) + .build() + ); + } catch (final Exception e) { + verificationLogger.error("Broker Address verification failed", e); + results.add( + new ConfigVerificationResult.Builder() + .verificationStepName(ADDRESSES_STEP) + .outcome(FAILED) + .explanation(e.getMessage()) + .build() + ); + } + + return bootstrapAddresses; + } + + private List<InetSocketAddress> getConnectedAddresses(final ComponentLog verificationLogger, final List<ConfigVerificationResult> results, final List<InetSocketAddress> bootstrapAddresses) { + final List<InetSocketAddress> connectedAddresses = new ArrayList<>(); Review Comment: Should we use a thread-safe list here? Given that we have virtual threads that execute in parallel, we could have a ConcurrentModificationException, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
