[
https://issues.apache.org/jira/browse/NIFI-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484021#comment-16484021
]
ASF GitHub Bot commented on NIFI-5133:
--------------------------------------
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2724#discussion_r189904067
--- Diff:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.common.collect.ImmutableList;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
+
+@SeeAlso({PublishGCPubSub.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"google", "google-cloud", "pubsub", "consume"})
+@CapabilityDescription("Consumes message from the configured Google Cloud
PubSub subscription. If the 'Batch Size' is set, " +
+ "the configured number of messages will be pulled in a single
request, else only one message will be pulled.")
+@WritesAttributes({
+ @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description =
ACK_ID_DESCRIPTION),
+ @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE,
description = SERIALIZED_SIZE_DESCRIPTION),
+ @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
+ @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE,
description = MSG_PUBLISH_TIME_DESCRIPTION),
+ @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE,
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
+})
+public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
+
+ public static final PropertyDescriptor SUBSCRIPTION = new
PropertyDescriptor.Builder()
+ .name("gcp-pubsub-subscription")
+ .displayName("Subscription")
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .description("Subscription name of the Google Cloud Pub/Sub")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ private SubscriberStub subscriber = null;
+ private PullRequest pullRequest;
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ final Integer batchSize =
(context.getProperty(BATCH_SIZE).isSet()) ?
context.getProperty(BATCH_SIZE).asInteger() : 1;
+
+ pullRequest = PullRequest.newBuilder()
+ .setMaxMessages(batchSize)
+ .setReturnImmediately(false)
+ .setSubscription(getSubscriptionName(context))
+ .build();
+
+ try {
+ subscriber = getSubscriber(context);
+ } catch (IOException e) {
+ getLogger().error("Failed to create Google Cloud Subscriber
due to ", e);
+ }
+ }
+
+ @OnStopped
+ public void onStopped() {
+ if (subscriber != null) {
+ subscriber.shutdown();
+ }
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return ImmutableList.of(PROJECT_ID,
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ SUBSCRIPTION,
+ BATCH_SIZE);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ if (subscriber == null) {
--- End diff --
if subscriber is null, it means that something went wrong in the
@OnScheduled method, right? If so, could be a good idea to log an error here,
no? (so that a bulletin is generated to warn users)
> Create a processor to Publish and Subscribe to Google Pub/Sub
> -------------------------------------------------------------
>
> Key: NIFI-5133
> URL: https://issues.apache.org/jira/browse/NIFI-5133
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Reporter: Abdelkrim Hadjidj
> Assignee: Sivaprasanna Sethuraman
> Priority: Major
>
> As a workflow designer, I would like to publish/subscribe messages to/from
> Google Pub/Sub. This integration can enable several ingestion and realtime
> use case where data is available on Google Cloud.
> There are few options that are outside the NiFi project:
> [https://github.com/ammitt90/nifi-pubsub-processor]
> [https://github.com/synack/nifi-gcp-pubsub-publisher]
> [https://github.com/synack/nifi-gcp-pubsub-consumer]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)