This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e45d5d7d6f NIFI-14630 Corrected usage of KafkaClientComponent
interface (#9993)
e45d5d7d6f is described below
commit e45d5d7d6fd0b31af3ac6ecbca51adfa0555a7d4
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Jun 17 16:11:06 2025 +0200
NIFI-14630 Corrected usage of KafkaClientComponent interface (#9993)
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/kafka/processors/PublishKafka.java | 12 +++-
.../kafka/service/Kafka3ConnectionService.java | 84 +---------------------
.../shared/component/KafkaClientComponent.java | 21 +++---
.../shared/component/KafkaPublishComponent.java | 34 ---------
4 files changed, 23 insertions(+), 128 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
index c61dd0d2ea..4b1d7cef0e 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
@@ -54,7 +54,6 @@ import
org.apache.nifi.kafka.service.api.producer.PublishContext;
import org.apache.nifi.kafka.service.api.producer.RecordSummary;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
-import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
import org.apache.nifi.kafka.shared.property.FailureStrategy;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
@@ -104,7 +103,7 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "msg.count", description = "The number of
messages that were sent to Kafka for this FlowFile. This attribute is added
only to "
+ "FlowFiles that are routed to success.")
@SeeAlso({ConsumeKafka.class})
-public class PublishKafka extends AbstractProcessor implements
KafkaPublishComponent, VerifiableProcessor {
+public class PublishKafka extends AbstractProcessor implements
VerifiableProcessor {
protected static final String MSG_COUNT = "msg.count";
public static final PropertyDescriptor CONNECTION_SERVICE = new
PropertyDescriptor.Builder()
@@ -123,6 +122,15 @@ public class PublishKafka extends AbstractProcessor
implements KafkaPublishCompo
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+ public static final PropertyDescriptor FAILURE_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Failure Strategy")
+ .displayName("Failure Strategy")
+ .description("Specifies how the processor handles a FlowFile if it
is unable to publish the data to Kafka")
+ .required(true)
+ .allowableValues(FailureStrategy.class)
+ .defaultValue(FailureStrategy.ROUTE_TO_FAILURE)
+ .build();
+
static final PropertyDescriptor DELIVERY_GUARANTEE = new
PropertyDescriptor.Builder()
.name("acks")
.displayName("Delivery Guarantee")
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index cb19eae037..8f95f4d03a 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -50,13 +50,13 @@ import
org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
import org.apache.nifi.kafka.service.consumer.Subscription;
import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
import org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.IsolationLevel;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
import
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
-import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
@@ -76,8 +76,6 @@ import java.util.regex.Pattern;
import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
import static
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
import static
org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler.PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER;
-import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME;
-import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
@@ -94,85 +92,7 @@ import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUS
+ " For the list of available Kafka properties please refer
to: http://kafka.apache.org/documentation.html#configuration.",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
@CapabilityDescription("Provides and manages connections to Kafka Brokers for
producer or consumer operations.")
-public class Kafka3ConnectionService extends AbstractControllerService
implements KafkaConnectionService, VerifiableControllerService {
-
- public static final PropertyDescriptor BOOTSTRAP_SERVERS = new
PropertyDescriptor.Builder()
- .name("bootstrap.servers")
- .displayName("Bootstrap Servers")
- .description("Comma-separated list of Kafka Bootstrap Servers in
the format host:port. Corresponds to Kafka bootstrap.servers property")
- .required(true)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .build();
-
- public static final PropertyDescriptor SECURITY_PROTOCOL = new
PropertyDescriptor.Builder()
- .name("security.protocol")
- .displayName("Security Protocol")
- .description("Security protocol used to communicate with brokers.
Corresponds to Kafka Client security.protocol property")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .allowableValues(SecurityProtocol.values())
- .defaultValue(SecurityProtocol.PLAINTEXT.name())
- .build();
-
- public static final PropertyDescriptor SASL_MECHANISM = new
PropertyDescriptor.Builder()
- .name("sasl.mechanism")
- .displayName("SASL Mechanism")
- .description("SASL mechanism used for authentication. Corresponds
to Kafka Client sasl.mechanism property")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .allowableValues(SaslMechanism.getAvailableSaslMechanisms())
- .defaultValue(SaslMechanism.GSSAPI.getValue())
- .dependsOn(SECURITY_PROTOCOL,
- SecurityProtocol.SASL_PLAINTEXT.name(),
- SecurityProtocol.SASL_SSL.name())
- .build();
-
- public static final PropertyDescriptor SASL_USERNAME = new
PropertyDescriptor.Builder()
- .name("sasl.username")
- .displayName("SASL Username")
- .description("Username provided with configured password when
using PLAIN or SCRAM SASL Mechanisms")
- .required(true)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .dependsOn(
- SASL_MECHANISM,
- SaslMechanism.PLAIN.getValue(),
- SaslMechanism.SCRAM_SHA_256.getValue(),
- SaslMechanism.SCRAM_SHA_512.getValue()
- )
- .build();
-
- public static final PropertyDescriptor SASL_PASSWORD = new
PropertyDescriptor.Builder()
- .name("sasl.password")
- .displayName("SASL Password")
- .description("Password provided with configured username when
using PLAIN or SCRAM SASL Mechanisms")
- .required(true)
- .sensitive(true)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .dependsOn(
- SASL_MECHANISM,
- SaslMechanism.PLAIN.getValue(),
- SaslMechanism.SCRAM_SHA_256.getValue(),
- SaslMechanism.SCRAM_SHA_512.getValue()
- )
- .build();
-
- public static final PropertyDescriptor
SELF_CONTAINED_KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
- .name("kerberos-user-service")
- .displayName("Kerberos User Service")
- .description("Service supporting user authentication with
Kerberos")
-
.identifiesControllerService(SelfContainedKerberosUserService.class)
- .required(false)
- .build();
-
- public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("Service supporting SSL communication with Kafka
brokers")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
+public class Kafka3ConnectionService extends AbstractControllerService
implements KafkaConnectionService, VerifiableControllerService,
KafkaClientComponent {
public static final PropertyDescriptor TRANSACTION_ISOLATION_LEVEL = new
PropertyDescriptor.Builder()
.name("isolation.level")
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index d4567a476f..c8da8e5615 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -32,12 +32,11 @@ public interface KafkaClientComponent {
PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
.name("bootstrap.servers")
- .displayName("Kafka Brokers")
- .description("Comma-separated list of Kafka Brokers in the format
host:port")
+ .displayName("Bootstrap Servers")
+ .description("Comma-separated list of Kafka Bootstrap Servers in
the format host:port. Corresponds to Kafka bootstrap.servers property")
.required(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .defaultValue("localhost:9092")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
@@ -58,13 +57,16 @@ public interface KafkaClientComponent {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(SaslMechanism.getAvailableSaslMechanisms())
.defaultValue(SaslMechanism.GSSAPI)
+ .dependsOn(SECURITY_PROTOCOL,
+ SecurityProtocol.SASL_PLAINTEXT.name(),
+ SecurityProtocol.SASL_SSL.name())
.build();
PropertyDescriptor SASL_USERNAME = new PropertyDescriptor.Builder()
.name("sasl.username")
- .displayName("Username")
+ .displayName("SASL Username")
.description("Username provided with configured password when
using PLAIN or SCRAM SASL Mechanisms")
- .required(false)
+ .required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(
@@ -77,9 +79,9 @@ public interface KafkaClientComponent {
PropertyDescriptor SASL_PASSWORD = new PropertyDescriptor.Builder()
.name("sasl.password")
- .displayName("Password")
+ .displayName("SASL Password")
.description("Password provided with configured username when
using PLAIN or SCRAM SASL Mechanisms")
- .required(false)
+ .required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
@@ -119,8 +121,7 @@ public interface KafkaClientComponent {
.build();
PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("ssl.context.service")
- .displayName("SSL Context Service")
+ .name("SSL Context Service")
.description("Service supporting SSL communication with Kafka
brokers")
.required(false)
.identifiesControllerService(SSLContextService.class)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaPublishComponent.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaPublishComponent.java
deleted file mode 100644
index 922a37da29..0000000000
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaPublishComponent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.kafka.shared.component;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.kafka.shared.property.FailureStrategy;
-
-/**
- * Kafka Publish Component interface with common Property Descriptors
- */
-public interface KafkaPublishComponent extends KafkaClientComponent {
- PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder()
- .name("Failure Strategy")
- .displayName("Failure Strategy")
- .description("Specifies how the processor handles a FlowFile if it
is unable to publish the data to Kafka")
- .required(true)
- .allowableValues(FailureStrategy.class)
- .defaultValue(FailureStrategy.ROUTE_TO_FAILURE)
- .build();
-}