This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8b6bf16929 NIFI-13936 Removed GCP PubSub Lite Processors (#9455)
8b6bf16929 is described below
commit 8b6bf169296aa8d0a7660b6fb1d1fbda41216474
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Oct 28 15:36:40 2024 +0100
NIFI-13936 Removed GCP PubSub Lite Processors (#9455)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 4 -
.../gcp/pubsub/lite/ConsumeGCPubSubLite.java | 297 -------------------
.../gcp/pubsub/lite/PublishGCPubSubLite.java | 320 ---------------------
.../services/org.apache.nifi.processor.Processor | 2 -
.../gcp/pubsub/lite/PublishGCPubSubLiteTest.java | 65 -----
5 files changed, 688 deletions(-)
diff --git a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 49f122a61c..44e00b751e 100644
--- a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -124,10 +124,6 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
- <dependency>
- <groupId>com.google.cloud</groupId>
- <artifactId>google-cloud-pubsublite</artifactId>
- </dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-drive</artifactId>
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
deleted file mode 100644
index 9cab4f6383..0000000000
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub.lite;
-
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.pubsub.v1.AckReplyConsumer;
-import com.google.cloud.pubsub.v1.MessageReceiver;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
-import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
-import com.google.pubsub.v1.PubsubMessage;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
-
-@SeeAlso({PublishGCPubSubLite.class})
-@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume",
"lite"})
-@CapabilityDescription("Consumes message from the configured Google Cloud
PubSub Lite subscription.")
-@WritesAttributes({
- @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description =
MESSAGE_ID_DESCRIPTION),
- @WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description =
ORDERING_KEY_DESCRIPTION),
- @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
- @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description =
MSG_PUBLISH_TIME_DESCRIPTION),
- @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description
= DYNAMIC_ATTRIBUTES_DESCRIPTION)
-})
-public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements
VerifiableProcessor {
-
- public static final PropertyDescriptor SUBSCRIPTION = new
PropertyDescriptor.Builder()
- .name("gcp-pubsub-subscription")
- .displayName("Subscription")
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
- .description("Name of the Google Cloud Pub/Sub Subscription.
Example:
projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor BYTES_OUTSTANDING = new
PropertyDescriptor
- .Builder().name("gcp-bytes-outstanding")
- .displayName("Bytes Outstanding")
- .description("The number of quota bytes that may be outstanding to
the client.")
- .required(true)
- .defaultValue("10 MB")
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor MESSAGES_OUTSTANDING = new
PropertyDescriptor
- .Builder().name("gcp-messages-outstanding")
- .displayName("Messages Outstanding")
- .description("The number of messages that may be outstanding to
the client.")
- .required(true)
- .defaultValue("1000")
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
-
- private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
- GCP_CREDENTIALS_PROVIDER_SERVICE,
- SUBSCRIPTION,
- BYTES_OUTSTANDING,
- MESSAGES_OUTSTANDING
- );
-
- public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
-
- private Subscriber subscriber = null;
- private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return DESCRIPTORS;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- final Collection<ValidationResult> results = new
ArrayList<ValidationResult>(1);
- final String subscription =
validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
-
- try {
- SubscriptionPath.parse(subscription);
- } catch (final ApiException e) {
- results.add(new ValidationResult.Builder()
- .subject(SUBSCRIPTION.getName())
- .input(subscription)
- .valid(false)
- .explanation("The Suscription does not have a valid
format.")
- .build());
- }
-
- return results;
- }
-
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- try {
- if (subscriber == null) {
- subscriber = getSubscriber(context);
- }
- } catch (final Exception e) {
- getLogger().error("Failed to create Google Cloud PubSub Lite
Subscriber", e);
- throw new ProcessException(e);
- }
- try {
- subscriber.startAsync().awaitRunning();
- } catch (final Exception e) {
- getLogger().error("Failed to create Google Cloud PubSub Lite
Subscriber", subscriber.failureCause());
- throw new ProcessException(e);
- }
- }
-
- @OnStopped
- public void onStopped() {
- try {
- if (subscriber != null) {
- subscriber.stopAsync().awaitTerminated();
- subscriber = null;
- }
- } catch (final Exception e) {
- getLogger().warn("Failed to gracefully shutdown the Google Cloud
PubSub Lite Subscriber", e);
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- if (subscriber == null) {
- getLogger().error("Google Cloud PubSub Lite Subscriber was not
properly created. Yielding the processor...");
- context.yield();
- return;
- }
-
- if (!subscriber.isRunning()) {
- getLogger().error("Google Cloud PubSub Lite Subscriber is not
running. Yielding the processor...", subscriber.failureCause());
- throw new ProcessException(subscriber.failureCause());
- }
-
- final Message message = messages.poll();
- if (message == null) {
- context.yield();
- return;
- }
-
- FlowFile flowFile = session.create();
-
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(MESSAGE_ID_ATTRIBUTE,
message.getMessage().getMessageId());
- attributes.put(ORDERING_KEY_ATTRIBUTE,
message.getMessage().getOrderingKey());
- attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
String.valueOf(message.getMessage().getAttributesCount()));
- attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE,
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
- attributes.putAll(message.getMessage().getAttributesMap());
-
- flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = session.write(flowFile, out ->
out.write(message.getMessage().getData().toStringUtf8().getBytes()));
-
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().receive(flowFile,
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
-
- message.getConsumer().ack();
- }
-
- private Subscriber getSubscriber(final ProcessContext context) {
-
- final SubscriptionPath subscriptionPath =
SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
-
- final FlowControlSettings flowControlSettings =
FlowControlSettings.builder()
-
.setBytesOutstanding(context.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue())
-
.setMessagesOutstanding(context.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong())
- .build();
-
- final MessageReceiver receiver =
- (PubsubMessage message, AckReplyConsumer consumer) -> {
- try {
- messages.put(new Message(message, consumer));
- } catch (final InterruptedException e) {
- getLogger().error("Could not save the message inside
the internal queue of the processor", e);
- }
- };
-
- final SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
-
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
- .setSubscriptionPath(subscriptionPath)
- .setReceiver(receiver)
- .setPerPartitionFlowControlSettings(flowControlSettings)
- .build();
-
- return Subscriber.create(subscriberSettings);
- }
-
- private class Message {
- private PubsubMessage message;
- private AckReplyConsumer consumer;
-
- public Message(final PubsubMessage message, final AckReplyConsumer
consumer) {
- this.message = message;
- this.consumer = consumer;
- }
-
- public PubsubMessage getMessage() {
- return message;
- }
-
- public AckReplyConsumer getConsumer() {
- return consumer;
- }
- }
-
- @Override
- public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
- final List<ConfigVerificationResult> verificationResults = new
ArrayList<>();
- try {
- getSubscriber(context);
- verificationResults.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create the Subscriber")
- .outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully created the Google Cloud PubSub
Lite Subscriber")
- .build());
- } catch (final Exception e) {
- verificationLogger.error("Failed to create Google Cloud PubSub
Lite Subscriber", e);
-
- verificationResults.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create the Subscriber")
- .outcome(Outcome.FAILED)
- .explanation("Failed to create Google Cloud PubSub Lite
Subscriber: " + e.getLocalizedMessage())
- .build());
- }
- return verificationResults;
- }
-
- @Override
- protected GoogleCredentials getGoogleCredentials(final ProcessContext
context) {
- return
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
- }
-}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
deleted file mode 100644
index 49909399e4..0000000000
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub.lite;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.batching.BatchingSettings;
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.cloudpubsub.Publisher;
-import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.PubsubMessage;
-
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SystemResource;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
-import org.threeten.bp.Duration;
-
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
-
-@SeeAlso({ConsumeGCPubSubLite.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish",
"lite"})
-@CapabilityDescription("Publishes the content of the incoming FlowFile to the
configured Google Cloud PubSub Lite topic. The processor supports dynamic
properties." +
- " If any dynamic properties are present, they will be sent along with
the message in the form of 'attributes'.")
-@DynamicProperty(name = "Attribute name", value = "Value to be set to the
attribute",
- description = "Attributes to be set for the outgoing Google Cloud
PubSub Lite message", expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-@WritesAttributes({
- @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description =
MESSAGE_ID_DESCRIPTION),
- @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description =
TOPIC_NAME_DESCRIPTION)
-})
-@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"The entirety of the FlowFile's content "
- + "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements
VerifiableProcessor {
-
- public static final PropertyDescriptor TOPIC_NAME = new
PropertyDescriptor.Builder()
- .name("gcp-pubsub-topic")
- .displayName("Topic Name")
- .description("Name of the Google Cloud PubSub Topic. Example:
projects/8476107443/locations/europe-west1-d/topics/my-lite-topic")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor ORDERING_KEY = new
PropertyDescriptor
- .Builder().name("gcp-ordering-key")
- .displayName("Ordering Key")
- .description("Messages with the same ordering key will always get
published to the same partition. When this property is not "
- + "set, messages can get published to different partitions
if more than one partition exists for the topic.")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
-
- private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
- GCP_CREDENTIALS_PROVIDER_SERVICE,
- TOPIC_NAME,
- ORDERING_KEY,
- BATCH_SIZE_THRESHOLD,
- BATCH_BYTES_THRESHOLD,
- BATCH_DELAY_THRESHOLD
- );
-
- public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS,
REL_FAILURE);
-
- private Publisher publisher = null;
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return DESCRIPTORS;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .required(false)
- .name(propertyDescriptorName)
- .displayName(propertyDescriptorName)
-
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
- .dynamic(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- final Collection<ValidationResult> results = new
ArrayList<ValidationResult>(1);
- final String topic =
validationContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
-
- try {
- TopicPath.parse(topic);
- } catch (final ApiException e) {
- results.add(new ValidationResult.Builder()
- .subject(TOPIC_NAME.getName())
- .input(topic)
- .valid(false)
- .explanation("The Topic does not have a valid format.")
- .build());
- }
-
- return results;
- }
-
- @Override
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- try {
- if (publisher == null) {
- publisher = getPublisher(context);
- }
- } catch (final Exception e) {
- getLogger().error("Failed to create Google Cloud PubSub Lite
Publisher", e);
- throw new ProcessException(e);
- }
- try {
- publisher.startAsync().awaitRunning();
- } catch (final Exception e) {
- getLogger().error("Failed to create Google Cloud PubSub Lite
Publisher", publisher.failureCause());
- throw new ProcessException(e);
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final int flowFileCount =
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
- final List<FlowFile> flowFiles = session.get(flowFileCount);
-
- if (flowFiles.isEmpty()) {
- context.yield();
- return;
- }
-
- if (publisher == null) {
- getLogger().error("Google Cloud PubSub Lite Publisher was not
properly created. Yielding the processor...");
- context.yield();
- return;
- }
-
- if (!publisher.isRunning()) {
- getLogger().error("Google Cloud PubSub Lite Publisher is not
running. Yielding the processor...", publisher.failureCause());
- throw new ProcessException(publisher.failureCause());
- }
-
- final long startNanos = System.nanoTime();
- final List<FlowFile> successfulFlowFiles = new ArrayList<>();
- final String topicName =
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
- final List<ApiFuture<String>> futures = new ArrayList<>();
-
- try {
- for (FlowFile flowFile : flowFiles) {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- session.exportTo(flowFile, baos);
- final ByteString flowFileContent =
ByteString.copyFrom(baos.toByteArray());
- final String orderingKey =
context.getProperty(ORDERING_KEY).evaluateAttributeExpressions(flowFile).getValue();
-
- final PubsubMessage.Builder message =
PubsubMessage.newBuilder().setData(flowFileContent)
- .setPublishTime(Timestamp.newBuilder().build())
- .putAllAttributes(getDynamicAttributesMap(context,
flowFile));
-
- if (orderingKey != null) {
- message.setOrderingKey(orderingKey);
- }
-
- final ApiFuture<String> messageIdFuture =
publisher.publish(message.build());
- futures.add(messageIdFuture);
-
- flowFile = session.putAttribute(flowFile,
TOPIC_NAME_ATTRIBUTE, topicName);
- }
-
- try {
- ApiFutures.allAsList(futures).get();
- successfulFlowFiles.addAll(flowFiles);
- } catch (InterruptedException | ExecutionException e) {
- getLogger().error("Failed to publish the messages to Google
Cloud PubSub Lite topic '{}' due to {}, "
- + "routing all messages from the batch to failure",
topicName, e.getLocalizedMessage(), e);
- session.transfer(flowFiles, REL_FAILURE);
- context.yield();
- }
- } finally {
- if (!successfulFlowFiles.isEmpty()) {
- session.transfer(successfulFlowFiles, REL_SUCCESS);
- for (FlowFile flowFile : successfulFlowFiles) {
- final long transmissionMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile, topicName,
transmissionMillis);
- }
- }
- }
- }
-
- @OnStopped
- public void onStopped() {
- try {
- if (publisher != null) {
- publisher.stopAsync().awaitTerminated();
- publisher = null;
- }
- } catch (final Exception e) {
- getLogger().warn("Failed to gracefully shutdown the Google Cloud
PubSub Lite Publisher", e);
- }
- }
-
- private Map<String, String> getDynamicAttributesMap(final ProcessContext
context, final FlowFile flowFile) {
- final Map<String, String> attributes = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- final String value =
context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
- attributes.put(entry.getKey().getName(), value);
- }
- }
-
- return attributes;
- }
-
- private Publisher getPublisher(final ProcessContext context) {
- final TopicPath topicPath =
TopicPath.parse(context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue());
-
- final PublisherSettings publisherSettings =
- PublisherSettings.newBuilder()
- .setTopicPath(topicPath)
-
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
- .setBatchingSettings(BatchingSettings.newBuilder()
-
.setElementCountThreshold(context.getProperty(BATCH_SIZE_THRESHOLD).asLong())
-
.setRequestByteThreshold(context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue())
-
.setDelayThreshold(Duration.ofMillis(context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS)))
- .setIsEnabled(true)
- .build())
- .build();
-
- return Publisher.create(publisherSettings);
- }
-
- @Override
- public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
- final List<ConfigVerificationResult> verificationResults = new
ArrayList<>();
- try {
- getPublisher(context);
- verificationResults.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create the Publisher")
- .outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully created the Google Cloud PubSub
Lite Publisher")
- .build());
- } catch (final Exception e) {
- verificationLogger.error("Failed to create Google Cloud PubSub
Lite Publisher", e);
-
- verificationResults.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create the Publisher")
- .outcome(Outcome.FAILED)
- .explanation("Failed to create Google Cloud PubSub Lite
Publisher: " + e.getLocalizedMessage())
- .build());
- }
- return verificationResults;
- }
-
- @Override
- protected GoogleCredentials getGoogleCredentials(final ProcessContext
context) {
- return
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
- }
-}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index fbcb0d907a..100869e7b2 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,8 +18,6 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
org.apache.nifi.processors.gcp.storage.ListGCSBucket
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
-org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
-org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
org.apache.nifi.processors.gcp.bigquery.PutBigQuery
org.apache.nifi.processors.gcp.drive.ListGoogleDrive
org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
deleted file mode 100644
index 9204fc6aff..0000000000
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub.lite;
-
-import org.apache.nifi.controller.ControllerService;
-import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-public class PublishGCPubSubLiteTest {
-
- private TestRunner runner;
-
- @BeforeEach
- void setRunner() {
- runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
- }
-
- @Test
- void testPropertyDescriptors() throws InitializationException {
- runner.assertNotValid();
-
- final ControllerService controllerService = new
GCPCredentialsControllerService();
- final String controllerServiceId =
GCPCredentialsControllerService.class.getSimpleName();
- runner.addControllerService(controllerServiceId, controllerService);
- runner.enableControllerService(controllerService);
-
runner.setProperty(PublishGCPubSubLite.GCP_CREDENTIALS_PROVIDER_SERVICE,
controllerServiceId);
- runner.assertNotValid();
-
- runner.setProperty(PublishGCPubSubLite.TOPIC_NAME,
"projects/my-project/locations/my-location/topics/my-topic");
- runner.assertValid();
-
- runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "-1");
- runner.assertNotValid();
- runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "15");
- runner.assertValid();
-
- runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3");
- runner.assertNotValid();
- runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3 MB");
- runner.assertValid();
-
- runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100");
- runner.assertNotValid();
- runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100
millis");
- runner.assertValid();
- }
-}