This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 0cad418874dd4d156ecb2a9604872c0b301f5030
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Oct 28 15:43:42 2024 +0100

    NIFI-13544 Align number of Executors with Concurrent Tasks in 
PublishGCPubSub (#9454)
    
    Co-authored-by: Paul-Adrien Cordonnier <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 2a93817e87..9654f7553d 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -22,6 +22,7 @@ import com.google.api.core.ApiFutures;
 import com.google.api.gax.batching.BatchingSettings;
 import com.google.api.gax.core.FixedCredentialsProvider;
 import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.core.FixedExecutorProvider;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
 import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
@@ -82,6 +83,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
 import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
@@ -488,6 +490,10 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
                 .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
                 .setIsEnabled(true)
                 .build());
+
+        // Set fixed thread pool executor to number of concurrent tasks
+        publisherBuilder.setExecutorProvider(FixedExecutorProvider.create(new 
ScheduledThreadPoolExecutor(context.getMaxConcurrentTasks())));
+
         return publisherBuilder;
     }
 }

Reply via email to