This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e45d5d7d6f NIFI-14630 Corrected usage of KafkaClientComponent 
interface (#9993)
e45d5d7d6f is described below

commit e45d5d7d6fd0b31af3ac6ecbca51adfa0555a7d4
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Jun 17 16:11:06 2025 +0200

    NIFI-14630 Corrected usage of KafkaClientComponent interface (#9993)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/kafka/processors/PublishKafka.java | 12 +++-
 .../kafka/service/Kafka3ConnectionService.java     | 84 +---------------------
 .../shared/component/KafkaClientComponent.java     | 21 +++---
 .../shared/component/KafkaPublishComponent.java    | 34 ---------
 4 files changed, 23 insertions(+), 128 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
index c61dd0d2ea..4b1d7cef0e 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
@@ -54,7 +54,6 @@ import 
org.apache.nifi.kafka.service.api.producer.PublishContext;
 import org.apache.nifi.kafka.service.api.producer.RecordSummary;
 import org.apache.nifi.kafka.service.api.record.KafkaRecord;
 import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
-import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
 import org.apache.nifi.kafka.shared.property.FailureStrategy;
 import org.apache.nifi.kafka.shared.property.KeyEncoding;
 import org.apache.nifi.kafka.shared.property.PublishStrategy;
@@ -104,7 +103,7 @@ import java.util.stream.Collectors;
 @WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
         + "FlowFiles that are routed to success.")
 @SeeAlso({ConsumeKafka.class})
-public class PublishKafka extends AbstractProcessor implements 
KafkaPublishComponent, VerifiableProcessor {
+public class PublishKafka extends AbstractProcessor implements 
VerifiableProcessor {
     protected static final String MSG_COUNT = "msg.count";
 
     public static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
@@ -123,6 +122,15 @@ public class PublishKafka extends AbstractProcessor 
implements KafkaPublishCompo
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final PropertyDescriptor FAILURE_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Failure Strategy")
+            .displayName("Failure Strategy")
+            .description("Specifies how the processor handles a FlowFile if it 
is unable to publish the data to Kafka")
+            .required(true)
+            .allowableValues(FailureStrategy.class)
+            .defaultValue(FailureStrategy.ROUTE_TO_FAILURE)
+            .build();
+
     static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
             .name("acks")
             .displayName("Delivery Guarantee")
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index cb19eae037..8f95f4d03a 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -50,13 +50,13 @@ import 
org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
 import org.apache.nifi.kafka.service.consumer.Subscription;
 import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
 import org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
 import org.apache.nifi.kafka.shared.property.IsolationLevel;
 import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
 import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
 import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
 import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
-import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -76,8 +76,6 @@ import java.util.regex.Pattern;
 import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
 import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
 import static 
org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler.PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER;
-import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME;
-import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
@@ -94,85 +92,7 @@ import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUS
                 + " For the list of available Kafka properties please refer 
to: http://kafka.apache.org/documentation.html#configuration.";,
         expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
 @CapabilityDescription("Provides and manages connections to Kafka Brokers for 
producer or consumer operations.")
-public class Kafka3ConnectionService extends AbstractControllerService 
implements KafkaConnectionService, VerifiableControllerService {
-
-    public static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
-            .name("bootstrap.servers")
-            .displayName("Bootstrap Servers")
-            .description("Comma-separated list of Kafka Bootstrap Servers in 
the format host:port. Corresponds to Kafka bootstrap.servers property")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .build();
-
-    public static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
-            .name("security.protocol")
-            .displayName("Security Protocol")
-            .description("Security protocol used to communicate with brokers. 
Corresponds to Kafka Client security.protocol property")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(SecurityProtocol.values())
-            .defaultValue(SecurityProtocol.PLAINTEXT.name())
-            .build();
-
-    public static final PropertyDescriptor SASL_MECHANISM = new 
PropertyDescriptor.Builder()
-            .name("sasl.mechanism")
-            .displayName("SASL Mechanism")
-            .description("SASL mechanism used for authentication. Corresponds 
to Kafka Client sasl.mechanism property")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(SaslMechanism.getAvailableSaslMechanisms())
-            .defaultValue(SaslMechanism.GSSAPI.getValue())
-            .dependsOn(SECURITY_PROTOCOL,
-                    SecurityProtocol.SASL_PLAINTEXT.name(),
-                    SecurityProtocol.SASL_SSL.name())
-            .build();
-
-    public static final PropertyDescriptor SASL_USERNAME = new 
PropertyDescriptor.Builder()
-            .name("sasl.username")
-            .displayName("SASL Username")
-            .description("Username provided with configured password when 
using PLAIN or SCRAM SASL Mechanisms")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .dependsOn(
-                    SASL_MECHANISM,
-                    SaslMechanism.PLAIN.getValue(),
-                    SaslMechanism.SCRAM_SHA_256.getValue(),
-                    SaslMechanism.SCRAM_SHA_512.getValue()
-            )
-            .build();
-
-    public static final PropertyDescriptor SASL_PASSWORD = new 
PropertyDescriptor.Builder()
-            .name("sasl.password")
-            .displayName("SASL Password")
-            .description("Password provided with configured username when 
using PLAIN or SCRAM SASL Mechanisms")
-            .required(true)
-            .sensitive(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .dependsOn(
-                    SASL_MECHANISM,
-                    SaslMechanism.PLAIN.getValue(),
-                    SaslMechanism.SCRAM_SHA_256.getValue(),
-                    SaslMechanism.SCRAM_SHA_512.getValue()
-            )
-            .build();
-
-    public static final PropertyDescriptor 
SELF_CONTAINED_KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
-            .name("kerberos-user-service")
-            .displayName("Kerberos User Service")
-            .description("Service supporting user authentication with 
Kerberos")
-            
.identifiesControllerService(SelfContainedKerberosUserService.class)
-            .required(false)
-            .build();
-
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("Service supporting SSL communication with Kafka 
brokers")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
+public class Kafka3ConnectionService extends AbstractControllerService 
implements KafkaConnectionService, VerifiableControllerService, 
KafkaClientComponent {
 
     public static final PropertyDescriptor TRANSACTION_ISOLATION_LEVEL = new 
PropertyDescriptor.Builder()
             .name("isolation.level")
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index d4567a476f..c8da8e5615 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -32,12 +32,11 @@ public interface KafkaClientComponent {
 
     PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
             .name("bootstrap.servers")
-            .displayName("Kafka Brokers")
-            .description("Comma-separated list of Kafka Brokers in the format 
host:port")
+            .displayName("Bootstrap Servers")
+            .description("Comma-separated list of Kafka Bootstrap Servers in 
the format host:port. Corresponds to Kafka bootstrap.servers property")
             .required(true)
             .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .defaultValue("localhost:9092")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
 
     PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
@@ -58,13 +57,16 @@ public interface KafkaClientComponent {
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .allowableValues(SaslMechanism.getAvailableSaslMechanisms())
             .defaultValue(SaslMechanism.GSSAPI)
+            .dependsOn(SECURITY_PROTOCOL,
+                    SecurityProtocol.SASL_PLAINTEXT.name(),
+                    SecurityProtocol.SASL_SSL.name())
             .build();
 
     PropertyDescriptor SASL_USERNAME = new PropertyDescriptor.Builder()
             .name("sasl.username")
-            .displayName("Username")
+            .displayName("SASL Username")
             .description("Username provided with configured password when 
using PLAIN or SCRAM SASL Mechanisms")
-            .required(false)
+            .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .dependsOn(
@@ -77,9 +79,9 @@ public interface KafkaClientComponent {
 
     PropertyDescriptor SASL_PASSWORD = new PropertyDescriptor.Builder()
             .name("sasl.password")
-            .displayName("Password")
+            .displayName("SASL Password")
             .description("Password provided with configured username when 
using PLAIN or SCRAM SASL Mechanisms")
-            .required(false)
+            .required(true)
             .sensitive(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
@@ -119,8 +121,7 @@ public interface KafkaClientComponent {
             .build();
 
     PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("ssl.context.service")
-            .displayName("SSL Context Service")
+            .name("SSL Context Service")
             .description("Service supporting SSL communication with Kafka 
brokers")
             .required(false)
             .identifiesControllerService(SSLContextService.class)
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaPublishComponent.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaPublishComponent.java
deleted file mode 100644
index 922a37da29..0000000000
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaPublishComponent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.kafka.shared.component;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.kafka.shared.property.FailureStrategy;
-
-/**
- * Kafka Publish Component interface with common Property Descriptors
- */
-public interface KafkaPublishComponent extends KafkaClientComponent {
-    PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder()
-            .name("Failure Strategy")
-            .displayName("Failure Strategy")
-            .description("Specifies how the processor handles a FlowFile if it 
is unable to publish the data to Kafka")
-            .required(true)
-            .allowableValues(FailureStrategy.class)
-            .defaultValue(FailureStrategy.ROUTE_TO_FAILURE)
-            .build();
-}

Reply via email to