This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 0cad418874dd4d156ecb2a9604872c0b301f5030 Author: Pierre Villard <[email protected]> AuthorDate: Mon Oct 28 15:43:42 2024 +0100 NIFI-13544 Align number of Executors with Concurrent Tasks in PublishGCPubSub (#9454) Co-authored-by: Paul-Adrien Cordonnier <[email protected]> Signed-off-by: David Handermann <[email protected]> --- .../java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java index 2a93817e87..9654f7553d 100644 --- a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java +++ b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; @@ -82,6 +83,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION; @@ -488,6 +490,10 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { .setDelayThreshold(Duration.ofMillis(batchDelayThreshold)) .setIsEnabled(true) .build()); + + // Set fixed thread pool executor to number of concurrent tasks + publisherBuilder.setExecutorProvider(FixedExecutorProvider.create(new ScheduledThreadPoolExecutor(context.getMaxConcurrentTasks()))); + return publisherBuilder; } }
