Repository: nifi
Updated Branches:
  refs/heads/master 9aa7e65f7 -> 1663a6c09


NIFI-5133: Implemented Google Cloud PubSub Processors

Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>

This closes #2724.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1663a6c0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1663a6c0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1663a6c0

Branch: refs/heads/master
Commit: 1663a6c09428cb0301c84e547cefc7e17ff2cd20
Parents: 9aa7e65
Author: zenfenan <sivaprasanna...@gmail.com>
Authored: Sat May 12 22:51:23 2018 +0530
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Wed May 30 09:47:18 2018 +0200

----------------------------------------------------------------------
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml |   4 +
 .../processors/gcp/AbstractGCPProcessor.java    |   5 +-
 .../gcp/pubsub/AbstractGCPubSubProcessor.java   |  65 +++++
 .../processors/gcp/pubsub/ConsumeGCPubSub.java  | 201 +++++++++++++++
 .../processors/gcp/pubsub/PubSubAttributes.java |  42 ++++
 .../processors/gcp/pubsub/PublishGCPubSub.java  | 249 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   4 +-
 .../gcp/pubsub/AbstractGCPubSubIT.java          |  47 ++++
 .../gcp/pubsub/ConsumeGCPubSubIT.java           | 102 ++++++++
 .../gcp/pubsub/PublishGCPubSubIT.java           |  75 ++++++
 10 files changed, 791 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index eeb757b..8469e29 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -72,6 +72,10 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-pubsub</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.tdunning</groupId>
             <artifactId>json</artifactId>
             <version>1.8</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index 0505d63..0da6c62 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -28,6 +28,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 
+
 import java.util.List;
 
 /**
@@ -110,7 +111,7 @@ public abstract class AbstractGCPProcessor<
      * @return GoogleCredentials for the processor to access.
      * @see  <a 
href="https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/com/google/api/client/googleapis/auth/oauth2/GoogleCredential";>AuthCredentials</a>
      */
-    private GoogleCredentials getGoogleCredentials(final ProcessContext 
context) {
+    protected GoogleCredentials getGoogleCredentials(final ProcessContext 
context) {
         final GCPCredentialsService gcpCredentialsService =
                 
context.getProperty(GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class);
         return gcpCredentialsService.getGoogleCredentials();
@@ -123,7 +124,7 @@ public abstract class AbstractGCPProcessor<
     @OnScheduled
     public void onScheduled(ProcessContext context) {
         final CloudServiceOptions options = getServiceOptions(context, 
getGoogleCredentials(context));
-        this.cloudService = options.getService();
+        this.cloudService = options != null ? options.getService() : null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
new file mode 100644
index 0000000..31dc0a7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ServiceOptions;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-publish-batch-size")
+            .displayName("Batch Size")
+            .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
+                    "will be used in a batch")
+            .required(true)
+            .defaultValue("15")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles are routed to this relationship after a 
successful Google Cloud Pub/Sub operation.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles are routed to this relationship if the 
Google Cloud Pub/Sub operation fails.")
+            .build();
+
+    private static final Set<Relationship> relationships = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, 
REL_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected ServiceOptions getServiceOptions(ProcessContext context, 
GoogleCredentials credentials) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
new file mode 100644
index 0000000..23aaff0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.common.collect.ImmutableList;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
+
+@SeeAlso({PublishGCPubSub.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
+@CapabilityDescription("Consumes message from the configured Google Cloud 
PubSub subscription. If the 'Batch Size' is set, " +
+        "the configured number of messages will be pulled in a single request, 
else only one message will be pulled.")
+@WritesAttributes({
+        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = 
ACK_ID_DESCRIPTION),
+        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = 
SERIALIZED_SIZE_DESCRIPTION),
+        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
+        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = 
MSG_PUBLISH_TIME_DESCRIPTION),
+        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description 
= DYNAMIC_ATTRIBUTES_DESCRIPTION)
+})
+public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
+
+    public static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-subscription")
+            .displayName("Subscription")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .description("Name of the Google Cloud Pub/Sub Subscription")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private SubscriberStub subscriber = null;
+    private PullRequest pullRequest;
+
+    private AtomicReference<Exception> storedException = new 
AtomicReference<>();
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+        pullRequest = PullRequest.newBuilder()
+                .setMaxMessages(batchSize)
+                .setReturnImmediately(false)
+                .setSubscription(getSubscriptionName(context))
+                .build();
+
+        try {
+            subscriber = getSubscriber(context);
+        } catch (IOException e) {
+            storedException.set(e);
+            getLogger().error("Failed to create Google Cloud Subscriber due to 
{}", new Object[]{e});
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        if (subscriber != null) {
+            subscriber.shutdown();
+        }
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.of(PROJECT_ID,
+                GCP_CREDENTIALS_PROVIDER_SERVICE,
+                SUBSCRIPTION,
+                BATCH_SIZE);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        if (subscriber == null) {
+
+            if (storedException.get() != null) {
+                getLogger().error("Failed to create Google Cloud PubSub 
subscriber due to {}", new Object[]{storedException.get()});
+            } else {
+                getLogger().error("Google Cloud PubSub Subscriber was not 
properly created. Yielding the processor...");
+            }
+
+            context.yield();
+            return;
+        }
+
+        final PullResponse pullResponse = 
subscriber.pullCallable().call(pullRequest);
+        final List<String> ackIds = new ArrayList<>();
+
+        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) 
{
+            if (message.hasMessage()) {
+                FlowFile flowFile = session.create();
+
+                final Map<String, String> attributes = new HashMap<>();
+                ackIds.add(message.getAckId());
+
+                attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
+                attributes.put(SERIALIZED_SIZE_ATTRIBUTE, 
String.valueOf(message.getSerializedSize()));
+                attributes.put(MESSAGE_ID_ATTRIBUTE, 
message.getMessage().getMessageId());
+                attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
String.valueOf(message.getMessage().getAttributesCount()));
+                attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, 
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+                attributes.putAll(message.getMessage().getAttributesMap());
+
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                flowFile = session.write(flowFile, out -> 
out.write(message.getMessage().getData().toByteArray()));
+
+                session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().receive(flowFile, 
getSubscriptionName(context));
+            }
+        }
+
+        if (!ackIds.isEmpty()) {
+            AcknowledgeRequest acknowledgeRequest = 
AcknowledgeRequest.newBuilder()
+                    .addAllAckIds(ackIds)
+                    .setSubscription(getSubscriptionName(context))
+                    .build();
+            subscriber.acknowledgeCallable().call(acknowledgeRequest);
+        }
+    }
+
+    private String getSubscriptionName(ProcessContext context) {
+        final String subscriptionName = 
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
+        final String projectId = context.getProperty(PROJECT_ID).getValue();
+
+        if (subscriptionName.contains("/")) {
+            return ProjectSubscriptionName.parse(subscriptionName).toString();
+        } else {
+            return ProjectSubscriptionName.of(projectId, 
subscriptionName).toString();
+        }
+
+    }
+
+    private SubscriberStub getSubscriber(ProcessContext context) throws 
IOException {
+
+        final SubscriberStubSettings subscriberStubSettings = 
SubscriberStubSettings.newBuilder()
+                
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                .build();
+
+        return GrpcSubscriberStub.create(subscriberStubSettings);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
new file mode 100644
index 0000000..6aaf04a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+public class PubSubAttributes {
+
+    public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
+    public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub 
message published to the configured Google Cloud PubSub topic";
+
+    public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
+    public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google 
Cloud PubSub topic the message was published to";
+
+    public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId";
+    public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the 
consumed Google Cloud PubSub message";
+
+    public static final String SERIALIZED_SIZE_ATTRIBUTE = 
"gcp.pubsub.messageSize";
+    public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size 
of the consumed Google Cloud PubSub message";
+
+    public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = 
"gcp.pubsub.attributesCount";
+    public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of 
attributes the consumed PubSub message has, if any";
+
+    public static final String MSG_PUBLISH_TIME_ATTRIBUTE = 
"gcp.pubsub.publishTime";
+    public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value 
when the message was published";
+
+    public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic 
Attributes";
+    public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than 
the listed attributes, this processor may write zero or more attributes, " +
+            "if the original Google Cloud Publisher client added any 
attributes to the message while sending";
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
new file mode 100644
index 0000000..79e8614
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.DeadlineExceededException;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
+
+@SeeAlso({ConsumeGCPubSub.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
+@CapabilityDescription("Publishes the content of the incoming flowfile to the 
configured Google Cloud PubSub topic. The processor supports dynamic 
properties." +
+        " If any dynamic properties are present, they will be sent along with 
the message in the form of 'attributes'.")
+@DynamicProperty(name = "Attribute name", value = "Value to be set to the 
attribute",
+        description = "Attributes to be set for the outgoing Google Cloud 
PubSub message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+@WritesAttributes({
+        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
+        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
+})
+public class PublishGCPubSub extends AbstractGCPubSubProcessor{
+
+    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
+            .name("gcp-pubsub-topic")
+            .displayName("Topic Name")
+            .description("Name of the Google Cloud PubSub Topic")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("FlowFiles are routed to this relationship if the 
Google Cloud Pub/Sub operation fails but attempting the operation again may 
succeed.")
+            .build();
+
+    private Publisher publisher = null;
+    private AtomicReference<Exception> storedException = new 
AtomicReference<>();
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.of(PROJECT_ID,
+                GCP_CREDENTIALS_PROVIDER_SERVICE,
+                TOPIC_NAME,
+                BATCH_SIZE);
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .required(false)
+                .name(propertyDescriptorName)
+                .displayName(propertyDescriptorName)
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .dynamic(true)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .build();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.unmodifiableSet(
+                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, 
REL_RETRY))
+        );
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        try {
+            publisher = getPublisherBuilder(context).build();
+        } catch (IOException e) {
+            getLogger().error("Failed to create Google Cloud PubSub Publisher 
due to {}", new Object[]{e});
+            storedException.set(e);
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFiles = session.get(flowFileCount);
+
+        if (flowFiles.isEmpty() || publisher == null) {
+            if (storedException.get() != null) {
+                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
+            }
+            context.yield();
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
+        final String topicName = getTopicName(context).toString();
+
+        try {
+            for (FlowFile flowFile : flowFiles) {
+                try {
+                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                    session.exportTo(flowFile, baos);
+                    final ByteString flowFileContent = 
ByteString.copyFromUtf8(baos.toString());
+
+                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
+                            .setPublishTime(Timestamp.newBuilder().build())
+                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
+                            .build();
+
+                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+
+                    while (messageIdFuture.isDone()) {
+                        Thread.sleep(500L);
+                    }
+
+                    final String messageId = messageIdFuture.get();
+                    final Map<String, String> attributes = new HashMap<>();
+
+                    attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
+                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+                    successfulFlowFiles.add(flowFile);
+                } catch (InterruptedException | ExecutionException e) {
+                    if (e.getCause() instanceof DeadlineExceededException) {
+                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
+                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
+                        session.transfer(flowFile, REL_RETRY);
+                    } else {
+                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {}",
+                                new Object[]{topicName, e});
+                        session.transfer(flowFile, REL_FAILURE);
+                        context.yield();
+                    }
+                }
+            }
+        } finally {
+            if (!successfulFlowFiles.isEmpty()) {
+                session.transfer(successfulFlowFiles, REL_SUCCESS);
+                for (FlowFile flowFile : successfulFlowFiles) {
+                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
+                }
+            }
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        shutdownPublisher();
+    }
+
+    private void shutdownPublisher() {
+        try {
+            if (publisher != null) {
+                publisher.shutdown();
+            }
+        } catch (Exception e) {
+            getLogger().warn("Failed to gracefully shutdown the Google Cloud 
PubSub Publisher due to {}", new Object[]{e});
+        }
+    }
+
+    private ProjectTopicName getTopicName(ProcessContext context) {
+        final String topic = 
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+        final String projectId = context.getProperty(PROJECT_ID).getValue();
+
+        if (topic.contains("/")) {
+            return ProjectTopicName.parse(topic);
+        } else {
+            return ProjectTopicName.of(projectId, topic);
+        }
+    }
+
+    private Map<String, String> getDynamicAttributesMap(ProcessContext 
context, FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            if (entry.getKey().isDynamic()) {
+                final String value = 
context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
+                attributes.put(entry.getKey().getName(), value);
+            }
+        }
+
+        return attributes;
+    }
+
+    private Publisher.Builder getPublisherBuilder(ProcessContext context) {
+        final Long batchSize = context.getProperty(BATCH_SIZE).asLong();
+
+        return Publisher.newBuilder(getTopicName(context))
+                
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                .setBatchingSettings(BatchingSettings.newBuilder()
+                .setElementCountThreshold(batchSize)
+                .setIsEnabled(true)
+                .build());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b5d5df7..249d19e 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,4 +15,6 @@
 org.apache.nifi.processors.gcp.storage.PutGCSObject
 org.apache.nifi.processors.gcp.storage.FetchGCSObject
 org.apache.nifi.processors.gcp.storage.DeleteGCSObject
-org.apache.nifi.processors.gcp.storage.ListGCSBucket
\ No newline at end of file
+org.apache.nifi.processors.gcp.storage.ListGCSBucket
+org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
+org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
new file mode 100644
index 0000000..933e678
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AbstractGCPubSubIT {
+
+    protected static final String PROJECT_ID = "my-gcm-client";
+    protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+    protected static TestRunner runner;
+
+    protected TestRunner setCredentialsCS(TestRunner runner) throws 
InitializationException {
+        final String serviceAccountJsonFilePath = "path/to/credentials/json";
+        final Map<String, String> propertiesMap = new HashMap<>();
+        final GCPCredentialsControllerService credentialsControllerService = 
new GCPCredentialsControllerService();
+
+        propertiesMap.put("application-default-credentials", "false");
+        propertiesMap.put("compute-engine-credentials", "false");
+        propertiesMap.put("service-account-json-file", 
serviceAccountJsonFilePath);
+
+        runner.addControllerService(CONTROLLER_SERVICE, 
credentialsControllerService, propertiesMap);
+        runner.enableControllerService(credentialsControllerService);
+        runner.assertValid(credentialsControllerService);
+
+        return runner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
new file mode 100644
index 0000000..e36d31c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+
+public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
+
+    @BeforeClass
+    public static void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(ConsumeGCPubSub.class);
+    }
+
+    @Test
+    public void testSimpleConsume() throws InitializationException {
+        final String subscription = "my-sub";
+        runner.clearTransferState();
+
+        runner = setCredentialsCS(runner);
+
+        runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
+        runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
+        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "10");
+
+        runner.assertValid();
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 10);
+        runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE);
+        
runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE);
+    }
+
+    @Test
+    public void testConsumeWithBatchSize() throws InitializationException {
+        final String subscription = "my-sub";
+        runner.clearTransferState();
+
+        runner = setCredentialsCS(runner);
+
+        runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
+        runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
+        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
+
+        runner.assertValid();
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 4);
+
+        runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE);
+        
runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE);
+    }
+
+    @Test
+    public void testConsumeWithFormattedSubscriptionName() throws 
InitializationException {
+        final String subscription = 
"projects/my-gcm-client/subscriptions/my-sub";
+        runner.clearTransferState();
+
+        runner = setCredentialsCS(runner);
+
+        runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
+        runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
+        runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
+
+        runner.assertValid();
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE);
+        
runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1663a6c0/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
new file mode 100644
index 0000000..b774db8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
+
+public class PublishGCPubSubIT extends AbstractGCPubSubIT{
+
+    @BeforeClass
+    public static void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(PublishGCPubSub.class);
+    }
+
+    @Test
+    public void testSimplePublish() throws InitializationException {
+        final String topic = "my-topic";
+
+        runner = setCredentialsCS(runner);
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
+
+        runner.assertValid();
+
+        runner.enqueue("Testing simple publish");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesContainAttribute(MESSAGE_ID_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(TOPIC_NAME_ATTRIBUTE);
+    }
+
+    @Test
+    public void testPublishWithFormattedTopicName() throws 
InitializationException {
+        final String topic = "projects/my-gcm-client/topics/my-topic";
+
+        runner = setCredentialsCS(runner);
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
+
+        runner.assertValid();
+
+        runner.enqueue("Testing publish with formatted topic name");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesContainAttribute(MESSAGE_ID_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(TOPIC_NAME_ATTRIBUTE);
+    }
+}

Reply via email to