Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2724#discussion_r191079620 --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.common.collect.ImmutableList; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION; + +@SeeAlso({PublishGCPubSub.class}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"google", "google-cloud", "pubsub", "consume"}) +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " + + "the configured number of messages will be pulled in a single request, else only one message will be pulled.") +@WritesAttributes({ + @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION), + @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION), + @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION), + @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION), + @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION) +}) +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor { + + public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() + .name("gcp-pubsub-subscription") + .displayName("Subscription") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .description("Subscription name of the Google Cloud Pub/Sub") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private SubscriberStub subscriber = null; + private PullRequest pullRequest; + + @OnScheduled + public void onScheduled(ProcessContext context) { + final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1; + + pullRequest = PullRequest.newBuilder() + .setMaxMessages(batchSize) + .setReturnImmediately(false) + .setSubscription(getSubscriptionName(context)) + .build(); + + try { + subscriber = getSubscriber(context); + } catch (IOException e) { + getLogger().error("Failed to create Google Cloud Subscriber due to ", e); + } + } + + @OnStopped + public void onStopped() { + if (subscriber != null) { + subscriber.shutdown(); + } + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return ImmutableList.of(PROJECT_ID, + GCP_CREDENTIALS_PROVIDER_SERVICE, + SUBSCRIPTION, + BATCH_SIZE); + } + + @Override + public Set<Relationship> getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + if (subscriber == null) { --- End diff -- It's done.
---