This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b6bf16929 NIFI-13936 Removed GCP PubSub Lite Processors (#9455)
8b6bf16929 is described below

commit 8b6bf169296aa8d0a7660b6fb1d1fbda41216474
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Oct 28 15:36:40 2024 +0100

    NIFI-13936 Removed GCP PubSub Lite Processors (#9455)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |   4 -
 .../gcp/pubsub/lite/ConsumeGCPubSubLite.java       | 297 -------------------
 .../gcp/pubsub/lite/PublishGCPubSubLite.java       | 320 ---------------------
 .../services/org.apache.nifi.processor.Processor   |   2 -
 .../gcp/pubsub/lite/PublishGCPubSubLiteTest.java   |  65 -----
 5 files changed, 688 deletions(-)

diff --git a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 49f122a61c..44e00b751e 100644
--- a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -124,10 +124,6 @@
             <groupId>com.google.cloud</groupId>
             <artifactId>google-cloud-pubsub</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.google.cloud</groupId>
-            <artifactId>google-cloud-pubsublite</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.google.apis</groupId>
             <artifactId>google-api-services-drive</artifactId>
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
deleted file mode 100644
index 9cab4f6383..0000000000
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub.lite;
-
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.pubsub.v1.AckReplyConsumer;
-import com.google.cloud.pubsub.v1.MessageReceiver;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
-import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
-import com.google.pubsub.v1.PubsubMessage;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
-
-@SeeAlso({PublishGCPubSubLite.class})
-@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", 
"lite"})
-@CapabilityDescription("Consumes message from the configured Google Cloud 
PubSub Lite subscription.")
-@WritesAttributes({
-        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
-        @WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = 
ORDERING_KEY_DESCRIPTION),
-        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
-        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = 
MSG_PUBLISH_TIME_DESCRIPTION),
-        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description 
= DYNAMIC_ATTRIBUTES_DESCRIPTION)
-})
-public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements 
VerifiableProcessor {
-
-    public static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
-            .name("gcp-pubsub-subscription")
-            .displayName("Subscription")
-            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
-            .description("Name of the Google Cloud Pub/Sub Subscription. 
Example: 
projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .build();
-
-    public static final PropertyDescriptor BYTES_OUTSTANDING = new 
PropertyDescriptor
-            .Builder().name("gcp-bytes-outstanding")
-            .displayName("Bytes Outstanding")
-            .description("The number of quota bytes that may be outstanding to 
the client.")
-            .required(true)
-            .defaultValue("10 MB")
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MESSAGES_OUTSTANDING = new 
PropertyDescriptor
-            .Builder().name("gcp-messages-outstanding")
-            .displayName("Messages Outstanding")
-            .description("The number of messages that may be outstanding to 
the client.")
-            .required(true)
-            .defaultValue("1000")
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
-            GCP_CREDENTIALS_PROVIDER_SERVICE,
-            SUBSCRIPTION,
-            BYTES_OUTSTANDING,
-            MESSAGES_OUTSTANDING
-    );
-
-    public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
-
-    private Subscriber subscriber = null;
-    private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final Collection<ValidationResult> results = new 
ArrayList<ValidationResult>(1);
-        final String subscription = 
validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
-
-        try {
-            SubscriptionPath.parse(subscription);
-        } catch (final ApiException e) {
-            results.add(new ValidationResult.Builder()
-                    .subject(SUBSCRIPTION.getName())
-                    .input(subscription)
-                    .valid(false)
-                    .explanation("The Suscription does not have a valid 
format.")
-                    .build());
-        }
-
-        return results;
-    }
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        try {
-            if (subscriber == null) {
-                subscriber = getSubscriber(context);
-            }
-        } catch (final Exception e) {
-            getLogger().error("Failed to create Google Cloud PubSub Lite 
Subscriber", e);
-            throw new ProcessException(e);
-        }
-        try {
-            subscriber.startAsync().awaitRunning();
-        } catch (final Exception e) {
-            getLogger().error("Failed to create Google Cloud PubSub Lite 
Subscriber", subscriber.failureCause());
-            throw new ProcessException(e);
-        }
-    }
-
-    @OnStopped
-    public void onStopped() {
-        try {
-            if (subscriber != null) {
-                subscriber.stopAsync().awaitTerminated();
-                subscriber = null;
-            }
-        } catch (final Exception e) {
-            getLogger().warn("Failed to gracefully shutdown the Google Cloud 
PubSub Lite Subscriber", e);
-        }
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        if (subscriber == null) {
-            getLogger().error("Google Cloud PubSub Lite Subscriber was not 
properly created. Yielding the processor...");
-            context.yield();
-            return;
-        }
-
-        if (!subscriber.isRunning()) {
-            getLogger().error("Google Cloud PubSub Lite Subscriber is not 
running. Yielding the processor...", subscriber.failureCause());
-            throw new ProcessException(subscriber.failureCause());
-        }
-
-        final Message message = messages.poll();
-        if (message == null) {
-            context.yield();
-            return;
-        }
-
-        FlowFile flowFile = session.create();
-
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(MESSAGE_ID_ATTRIBUTE, 
message.getMessage().getMessageId());
-        attributes.put(ORDERING_KEY_ATTRIBUTE, 
message.getMessage().getOrderingKey());
-        attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
String.valueOf(message.getMessage().getAttributesCount()));
-        attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, 
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
-        attributes.putAll(message.getMessage().getAttributesMap());
-
-        flowFile = session.putAllAttributes(flowFile, attributes);
-        flowFile = session.write(flowFile, out -> 
out.write(message.getMessage().getData().toStringUtf8().getBytes()));
-
-        session.transfer(flowFile, REL_SUCCESS);
-        session.getProvenanceReporter().receive(flowFile, 
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
-
-        message.getConsumer().ack();
-    }
-
-    private Subscriber getSubscriber(final ProcessContext context) {
-
-        final SubscriptionPath subscriptionPath = 
SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
-
-        final FlowControlSettings flowControlSettings = 
FlowControlSettings.builder()
-                
.setBytesOutstanding(context.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue())
-                
.setMessagesOutstanding(context.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong())
-                .build();
-
-        final MessageReceiver receiver =
-                (PubsubMessage message, AckReplyConsumer consumer) -> {
-                    try {
-                        messages.put(new Message(message, consumer));
-                    } catch (final InterruptedException e) {
-                        getLogger().error("Could not save the message inside 
the internal queue of the processor", e);
-                    }
-                };
-
-        final SubscriberSettings subscriberSettings = 
SubscriberSettings.newBuilder()
-                
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
-                .setSubscriptionPath(subscriptionPath)
-                .setReceiver(receiver)
-                .setPerPartitionFlowControlSettings(flowControlSettings)
-                .build();
-
-        return Subscriber.create(subscriberSettings);
-    }
-
-    private class Message {
-        private PubsubMessage message;
-        private AckReplyConsumer consumer;
-
-        public Message(final PubsubMessage message, final AckReplyConsumer 
consumer) {
-            this.message = message;
-            this.consumer = consumer;
-        }
-
-        public PubsubMessage getMessage() {
-            return message;
-        }
-
-        public AckReplyConsumer getConsumer() {
-            return consumer;
-        }
-    }
-
-    @Override
-    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
-        final List<ConfigVerificationResult> verificationResults = new 
ArrayList<>();
-        try {
-            getSubscriber(context);
-            verificationResults.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create the Subscriber")
-                    .outcome(Outcome.SUCCESSFUL)
-                    .explanation("Successfully created the Google Cloud PubSub 
Lite Subscriber")
-                    .build());
-        } catch (final Exception e) {
-            verificationLogger.error("Failed to create Google Cloud PubSub 
Lite Subscriber", e);
-
-            verificationResults.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create the Subscriber")
-                    .outcome(Outcome.FAILED)
-                    .explanation("Failed to create Google Cloud PubSub Lite 
Subscriber: " + e.getLocalizedMessage())
-                    .build());
-        }
-        return verificationResults;
-    }
-
-    @Override
-    protected GoogleCredentials getGoogleCredentials(final ProcessContext 
context) {
-        return 
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
-    }
-}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
deleted file mode 100644
index 49909399e4..0000000000
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub.lite;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.batching.BatchingSettings;
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.cloudpubsub.Publisher;
-import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.PubsubMessage;
-
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SystemResource;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
-import org.threeten.bp.Duration;
-
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
-import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
-import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
-
-@SeeAlso({ConsumeGCPubSubLite.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", 
"lite"})
-@CapabilityDescription("Publishes the content of the incoming FlowFile to the 
configured Google Cloud PubSub Lite topic. The processor supports dynamic 
properties." +
-        " If any dynamic properties are present, they will be sent along with 
the message in the form of 'attributes'.")
-@DynamicProperty(name = "Attribute name", value = "Value to be set to the 
attribute",
-        description = "Attributes to be set for the outgoing Google Cloud 
PubSub Lite message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-@WritesAttributes({
-        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
-        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
-})
-@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The entirety of the FlowFile's content "
-        + "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements 
VerifiableProcessor {
-
-    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
-            .name("gcp-pubsub-topic")
-            .displayName("Topic Name")
-            .description("Name of the Google Cloud PubSub Topic. Example: 
projects/8476107443/locations/europe-west1-d/topics/my-lite-topic")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .build();
-
-    public static final PropertyDescriptor ORDERING_KEY = new 
PropertyDescriptor
-            .Builder().name("gcp-ordering-key")
-            .displayName("Ordering Key")
-            .description("Messages with the same ordering key will always get 
published to the same partition. When this property is not "
-                    + "set, messages can get published to different partitions 
if more than one partition exists for the topic.")
-            .required(false)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .build();
-
-    private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
-            GCP_CREDENTIALS_PROVIDER_SERVICE,
-            TOPIC_NAME,
-            ORDERING_KEY,
-            BATCH_SIZE_THRESHOLD,
-            BATCH_BYTES_THRESHOLD,
-            BATCH_DELAY_THRESHOLD
-    );
-
-    public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, 
REL_FAILURE);
-
-    private Publisher publisher = null;
-
-    @Override
-    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .required(false)
-                .name(propertyDescriptorName)
-                .displayName(propertyDescriptorName)
-                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
-                .dynamic(true)
-                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-                .build();
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final Collection<ValidationResult> results = new 
ArrayList<ValidationResult>(1);
-        final String topic = 
validationContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
-
-        try {
-            TopicPath.parse(topic);
-        } catch (final ApiException e) {
-            results.add(new ValidationResult.Builder()
-                    .subject(TOPIC_NAME.getName())
-                    .input(topic)
-                    .valid(false)
-                    .explanation("The Topic does not have a valid format.")
-                    .build());
-        }
-
-        return results;
-    }
-
-    @Override
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        try {
-            if (publisher == null) {
-                publisher = getPublisher(context);
-            }
-        } catch (final Exception e) {
-            getLogger().error("Failed to create Google Cloud PubSub Lite 
Publisher", e);
-            throw new ProcessException(e);
-        }
-        try {
-            publisher.startAsync().awaitRunning();
-        } catch (final Exception e) {
-            getLogger().error("Failed to create Google Cloud PubSub Lite 
Publisher", publisher.failureCause());
-            throw new ProcessException(e);
-        }
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final int flowFileCount = 
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty()) {
-            context.yield();
-            return;
-        }
-
-        if (publisher == null) {
-            getLogger().error("Google Cloud PubSub Lite Publisher was not 
properly created. Yielding the processor...");
-            context.yield();
-            return;
-        }
-
-        if (!publisher.isRunning()) {
-            getLogger().error("Google Cloud PubSub Lite Publisher is not 
running. Yielding the processor...", publisher.failureCause());
-            throw new ProcessException(publisher.failureCause());
-        }
-
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
-        final List<ApiFuture<String>> futures = new ArrayList<>();
-
-        try {
-            for (FlowFile flowFile : flowFiles) {
-                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                session.exportTo(flowFile, baos);
-                final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-                final String orderingKey = 
context.getProperty(ORDERING_KEY).evaluateAttributeExpressions(flowFile).getValue();
-
-                final PubsubMessage.Builder message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                        .setPublishTime(Timestamp.newBuilder().build())
-                        .putAllAttributes(getDynamicAttributesMap(context, 
flowFile));
-
-                if (orderingKey != null) {
-                    message.setOrderingKey(orderingKey);
-                }
-
-                final ApiFuture<String> messageIdFuture = 
publisher.publish(message.build());
-                futures.add(messageIdFuture);
-
-                flowFile = session.putAttribute(flowFile, 
TOPIC_NAME_ATTRIBUTE, topicName);
-            }
-
-            try {
-                ApiFutures.allAsList(futures).get();
-                successfulFlowFiles.addAll(flowFiles);
-            } catch (InterruptedException | ExecutionException e) {
-                getLogger().error("Failed to publish the messages to Google 
Cloud PubSub Lite topic '{}' due to {}, "
-                        + "routing all messages from the batch to failure", 
topicName, e.getLocalizedMessage(), e);
-                session.transfer(flowFiles, REL_FAILURE);
-                context.yield();
-            }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
-                }
-            }
-        }
-    }
-
-    @OnStopped
-    public void onStopped() {
-        try {
-            if (publisher != null) {
-                publisher.stopAsync().awaitTerminated();
-                publisher = null;
-            }
-        } catch (final Exception e) {
-            getLogger().warn("Failed to gracefully shutdown the Google Cloud 
PubSub Lite Publisher", e);
-        }
-    }
-
-    private Map<String, String> getDynamicAttributesMap(final ProcessContext 
context, final FlowFile flowFile) {
-        final Map<String, String> attributes = new HashMap<>();
-        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
-            if (entry.getKey().isDynamic()) {
-                final String value = 
context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
-                attributes.put(entry.getKey().getName(), value);
-            }
-        }
-
-        return attributes;
-    }
-
-    private Publisher getPublisher(final ProcessContext context) {
-        final TopicPath topicPath = 
TopicPath.parse(context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue());
-
-        final PublisherSettings publisherSettings =
-                PublisherSettings.newBuilder()
-                    .setTopicPath(topicPath)
-                    
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
-                    .setBatchingSettings(BatchingSettings.newBuilder()
-                            
.setElementCountThreshold(context.getProperty(BATCH_SIZE_THRESHOLD).asLong())
-                            
.setRequestByteThreshold(context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue())
-                            
.setDelayThreshold(Duration.ofMillis(context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS)))
-                            .setIsEnabled(true)
-                            .build())
-                    .build();
-
-        return Publisher.create(publisherSettings);
-    }
-
-    @Override
-    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
-        final List<ConfigVerificationResult> verificationResults = new 
ArrayList<>();
-        try {
-            getPublisher(context);
-            verificationResults.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create the Publisher")
-                    .outcome(Outcome.SUCCESSFUL)
-                    .explanation("Successfully created the Google Cloud PubSub 
Lite Publisher")
-                    .build());
-        } catch (final Exception e) {
-            verificationLogger.error("Failed to create Google Cloud PubSub 
Lite Publisher", e);
-
-            verificationResults.add(new ConfigVerificationResult.Builder()
-                    .verificationStepName("Create the Publisher")
-                    .outcome(Outcome.FAILED)
-                    .explanation("Failed to create Google Cloud PubSub Lite 
Publisher: " + e.getLocalizedMessage())
-                    .build());
-        }
-        return verificationResults;
-    }
-
-    @Override
-    protected GoogleCredentials getGoogleCredentials(final ProcessContext 
context) {
-        return 
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
-    }
-}
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index fbcb0d907a..100869e7b2 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,8 +18,6 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
 org.apache.nifi.processors.gcp.storage.ListGCSBucket
 org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
 org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
-org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
-org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
 org.apache.nifi.processors.gcp.bigquery.PutBigQuery
 org.apache.nifi.processors.gcp.drive.ListGoogleDrive
 org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
deleted file mode 100644
index 9204fc6aff..0000000000
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub.lite;
-
-import org.apache.nifi.controller.ControllerService;
-import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-public class PublishGCPubSubLiteTest {
-
-    private TestRunner runner;
-
-    @BeforeEach
-    void setRunner() {
-        runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
-    }
-
-    @Test
-    void testPropertyDescriptors() throws InitializationException {
-        runner.assertNotValid();
-
-        final ControllerService controllerService = new 
GCPCredentialsControllerService();
-        final String controllerServiceId = 
GCPCredentialsControllerService.class.getSimpleName();
-        runner.addControllerService(controllerServiceId, controllerService);
-        runner.enableControllerService(controllerService);
-        
runner.setProperty(PublishGCPubSubLite.GCP_CREDENTIALS_PROVIDER_SERVICE, 
controllerServiceId);
-        runner.assertNotValid();
-
-        runner.setProperty(PublishGCPubSubLite.TOPIC_NAME, 
"projects/my-project/locations/my-location/topics/my-topic");
-        runner.assertValid();
-
-        runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "-1");
-        runner.assertNotValid();
-        runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "15");
-        runner.assertValid();
-
-        runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3");
-        runner.assertNotValid();
-        runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3 MB");
-        runner.assertValid();
-
-        runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100");
-        runner.assertNotValid();
-        runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100 
millis");
-        runner.assertValid();
-    }
-}


Reply via email to