This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new f7d3b75bcd NIFI-11150 Add Service Account JSON credentials support to Google Pub/Sub Lite procesors f7d3b75bcd is described below commit f7d3b75bcd59d7bf8652d14581b19414f0ac23d9 Author: Robert Kalmar <rfrostkal...@gmail.com> AuthorDate: Wed Feb 8 12:38:25 2023 +0100 NIFI-11150 Add Service Account JSON credentials support to Google Pub/Sub Lite procesors This closes #6933. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java | 14 +++++++++----- .../processors/gcp/pubsub/lite/PublishGCPubSubLite.java | 13 +++++++++---- .../org/apache/nifi/processors/gcp/util/GoogleUtils.java | 3 +++ 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java index ac40fb79df..d0797fee42 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.gcp.pubsub.lite; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.ApiException; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsublite.SubscriptionPath; @@ -50,7 +51,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -72,13 +72,12 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION; +import static org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE; @SeeAlso({PublishGCPubSubLite.class}) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"}) -@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription. In its current state, this processor " - + "will only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with " - + "'Use Application Default Credentials' or 'Use Compute Engine Credentials'.") +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription.") @WritesAttributes({ @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION), @WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = ORDERING_KEY_DESCRIPTION), @@ -219,7 +218,7 @@ public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements Ve message.getConsumer().ack(); } - private Subscriber getSubscriber(final ProcessContext context) throws IOException { + private Subscriber getSubscriber(final ProcessContext context) { final SubscriptionPath subscriptionPath = SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue()); @@ -286,4 +285,9 @@ public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements Ve } return verificationResults; } + + @Override + protected GoogleCredentials getGoogleCredentials(final ProcessContext context) { + return super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE); + } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java index 55edee02f7..603cfb954a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java @@ -21,6 +21,7 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.ApiException; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.Publisher; import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; @@ -75,14 +76,13 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION; +import static org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE; @SeeAlso({ConsumeGCPubSubLite.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"}) -@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." + - " If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'. In its current state, this processor will " + - "only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with 'Use Application Default " + - "Credentials' or 'Use Compute Engine Credentials'.") +@CapabilityDescription("Publishes the content of the incoming FlowFile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." + + " If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.") @DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute", description = "Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @WritesAttributes({ @@ -321,4 +321,9 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve } return verificationResults; } + + @Override + protected GoogleCredentials getGoogleCredentials(final ProcessContext context) { + return super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE); + } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java index 2d7fd1a112..cebfa4419f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java @@ -20,6 +20,9 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; public class GoogleUtils { + + public static final String GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; + /** * Links to the {@link GCPCredentialsService} which provides credentials for this particular processor. */