exceptionfactory commented on code in PR #7274:
URL: https://github.com/apache/nifi/pull/7274#discussion_r1220305463


##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/NiFiApiFutureCallback.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFutureCallback;
+
+import java.util.List;
+
+/**
+ * Specialization of {@link ApiFutureCallback} used to track Google PubSub 
send results.  Failure
+ * exceptions are captured to facilitate FlowFile routing decisions.
+ */
+public class NiFiApiFutureCallback implements ApiFutureCallback<String> {

Review Comment:
   Rather than using `NiFi` as the suffix, recommend something more specific to 
the function, such as `TrackedApiFutureCallback`



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.ContentInputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.NiFiApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+    private Throwable throwable;
+    private Publisher publisherMock;
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        throwable = null;
+        publisherMock = mock(Publisher.class);
+        runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+            @Override
+            @OnScheduled
+            public void onScheduled(ProcessContext context) {
+                publisher = publisherMock;
+            }
+
+            @Override
+            protected void addCallback(ApiFuture<String> apiFuture, 
ApiFutureCallback<? super String> callback, Executor executor) {
+                if (callback instanceof NiFiApiFutureCallback) {
+                    final NiFiApiFutureCallback niFiApiFutureCallback = 
(NiFiApiFutureCallback) callback;
+                    if (throwable == null) {
+                        
niFiApiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
+                    } else {
+                        niFiApiFutureCallback.onFailure(throwable);
+                    }
+                }
+            }
+        });
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 
millis");
+        runner.assertValid();
+    }
+
+    @Test
+    void testSendOne_Success_FlowFileStrategy() throws InitializationException 
{

Review Comment:
   Most test methods do no include the `_` character, following standard Java 
method naming conventions.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -213,62 +290,143 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
-        final List<FlowFile> flowFiles = session.get(flowFileCount);
-
-        if (flowFiles.isEmpty() || publisher == null) {
-            if (storedException.get() != null) {
-                getLogger().error("Google Cloud PubSub Publisher was not 
properly created due to {}", new Object[]{storedException.get()});
-            }
+        final StopWatch stopWatch = new StopWatch(true);
+        final ContentInputStrategy inputStrategy = 
ContentInputStrategy.valueOf(context.getProperty(CONTENT_INPUT_STRATEGY).getValue());
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+        if (flowFileBatch.isEmpty()) {
             context.yield();
-            return;
+        } else if 
(ContentInputStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+            onTriggerFlowFileStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else if (ContentInputStrategy.RECORD_ORIENTED.equals(inputStrategy)) 
{
+            onTriggerRecordStrategy(context, session, stopWatch, 
flowFileBatch);
+        } else {
+            throw new IllegalStateException(inputStrategy.getValue());
         }
+    }
 
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
-        final String topicName = getTopicName(context).toString();
+    private void onTriggerFlowFileStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        final long maxMessageSize = 
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
 
-        try {
-            for (FlowFile flowFile : flowFiles) {
-                try {
-                    final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                    session.exportTo(flowFile, baos);
-                    final ByteString flowFileContent = 
ByteString.copyFrom(baos.toByteArray());
-
-                    PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
-                            .setPublishTime(Timestamp.newBuilder().build())
-                            .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
-                            .build();
+        final Executor executor = MoreExecutors.directExecutor();
+        final List<FlowFileResult> flowFileResults = new ArrayList<>();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
-                    ApiFuture<String> messageIdFuture = 
publisher.publish(message);
+        for (final FlowFile flowFile : flowFileBatch) {
+            final List<ApiFuture<String>> futures = new ArrayList<>();
+            final List<String> successes = new ArrayList<>();
+            final List<Throwable> failures = new ArrayList<>();
 
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, 
messageIdFuture.get());
-                    attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+            if (flowFile.getSize() > maxMessageSize) {
+                final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
+                failures.add(new IllegalArgumentException(message));
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
+            } else {
+                baos.reset();
+                session.exportTo(flowFile, baos);
 
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    successfulFlowFiles.add(flowFile);
-                } catch (InterruptedException | ExecutionException e) {
-                    if (e.getCause() instanceof DeadlineExceededException) {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
-                                        "so routing to retry", new 
Object[]{topicName, e.getLocalizedMessage()}, e);
-                        session.transfer(flowFile, REL_RETRY);
-                    } else {
-                        getLogger().error("Failed to publish the message to 
Google Cloud PubSub topic '{}'", topicName, e);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    context.yield();
-                }
+                final ApiFuture<String> apiFuture = publishOneMessage(context, 
flowFile, baos.toByteArray());
+                futures.add(apiFuture);
+                addCallback(apiFuture, new NiFiApiFutureCallback(successes, 
failures), executor);
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
             }
-        } finally {
-            if (!successfulFlowFiles.isEmpty()) {
-                session.transfer(successfulFlowFiles, REL_SUCCESS);
-                for (FlowFile flowFile : successfulFlowFiles) {
-                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    session.getProvenanceReporter().send(flowFile, topicName, 
transmissionMillis);
-                }
+        }
+        finishBatch(session, stopWatch, flowFileResults);
+    }
+
+    private void onTriggerRecordStrategy(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final List<FlowFile> flowFileBatch) throws ProcessException {
+        try {
+            onTriggerRecordStrategyInner(context, session, stopWatch, 
flowFileBatch);
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException("Record publishing failed", e);
+        }
+    }
+
+    private void onTriggerRecordStrategyInner(

Review Comment:
   It seems like another name instead of the `Inner` suffix would be better. 
Perhaps `processFlowFileRecords` or something along those lines?



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.ContentInputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.NiFiApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+    private Throwable throwable;
+    private Publisher publisherMock;
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        throwable = null;
+        publisherMock = mock(Publisher.class);
+        runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+            @Override
+            @OnScheduled
+            public void onScheduled(ProcessContext context) {
+                publisher = publisherMock;
+            }
+
+            @Override
+            protected void addCallback(ApiFuture<String> apiFuture, 
ApiFutureCallback<? super String> callback, Executor executor) {
+                if (callback instanceof NiFiApiFutureCallback) {
+                    final NiFiApiFutureCallback niFiApiFutureCallback = 
(NiFiApiFutureCallback) callback;
+                    if (throwable == null) {
+                        
niFiApiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
+                    } else {
+                        niFiApiFutureCallback.onFailure(throwable);
+                    }
+                }
+            }
+        });
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 
millis");
+        runner.assertValid();
+    }
+
+    @Test
+    void testSendOne_Success_FlowFileStrategy() throws InitializationException 
{
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+
+        runner.enqueue("text");
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+        
assertNotNull(flowFile.getAttribute(PubSubAttributes.MESSAGE_ID_ATTRIBUTE));
+    }
+
+    @Test
+    void testSendOne_Retry_FlowFileStrategy() throws InitializationException {
+        throwable = new UnavailableException(null, 
GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+
+        runner.enqueue("text");
+        runner.run(1, true, true);

Review Comment:
   ```suggestion
           runner.run();
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.ContentInputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.NiFiApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+    private Throwable throwable;
+    private Publisher publisherMock;
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        throwable = null;
+        publisherMock = mock(Publisher.class);
+        runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+            @Override
+            @OnScheduled
+            public void onScheduled(ProcessContext context) {
+                publisher = publisherMock;
+            }
+
+            @Override
+            protected void addCallback(ApiFuture<String> apiFuture, 
ApiFutureCallback<? super String> callback, Executor executor) {
+                if (callback instanceof NiFiApiFutureCallback) {
+                    final NiFiApiFutureCallback niFiApiFutureCallback = 
(NiFiApiFutureCallback) callback;
+                    if (throwable == null) {
+                        
niFiApiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
+                    } else {
+                        niFiApiFutureCallback.onFailure(throwable);
+                    }
+                }
+            }
+        });
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 
millis");
+        runner.assertValid();
+    }
+
+    @Test
+    void testSendOne_Success_FlowFileStrategy() throws InitializationException 
{
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+
+        runner.enqueue("text");
+        runner.run(1, true, true);

Review Comment:
   This is the same as `TestRunner.run()`, so recommend removing the arguments 
and using the default behavior.
   ```suggestion
           runner.run();
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.ContentInputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.NiFiApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+    private Throwable throwable;
+    private Publisher publisherMock;
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        throwable = null;
+        publisherMock = mock(Publisher.class);
+        runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+            @Override
+            @OnScheduled
+            public void onScheduled(ProcessContext context) {
+                publisher = publisherMock;
+            }
+
+            @Override
+            protected void addCallback(ApiFuture<String> apiFuture, 
ApiFutureCallback<? super String> callback, Executor executor) {
+                if (callback instanceof NiFiApiFutureCallback) {
+                    final NiFiApiFutureCallback niFiApiFutureCallback = 
(NiFiApiFutureCallback) callback;
+                    if (throwable == null) {
+                        
niFiApiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
+                    } else {
+                        niFiApiFutureCallback.onFailure(throwable);
+                    }
+                }
+            }
+        });
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 
millis");
+        runner.assertValid();
+    }
+
+    @Test
+    void testSendOne_Success_FlowFileStrategy() throws InitializationException 
{
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+
+        runner.enqueue("text");

Review Comment:
   Recommend creating a static variable and reusing that for the input content 
across all methods.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.ContentInputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.NiFiApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+    private Throwable throwable;
+    private Publisher publisherMock;
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        throwable = null;
+        publisherMock = mock(Publisher.class);
+        runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+            @Override
+            @OnScheduled
+            public void onScheduled(ProcessContext context) {
+                publisher = publisherMock;
+            }
+
+            @Override
+            protected void addCallback(ApiFuture<String> apiFuture, 
ApiFutureCallback<? super String> callback, Executor executor) {
+                if (callback instanceof NiFiApiFutureCallback) {
+                    final NiFiApiFutureCallback niFiApiFutureCallback = 
(NiFiApiFutureCallback) callback;
+                    if (throwable == null) {
+                        
niFiApiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
+                    } else {
+                        niFiApiFutureCallback.onFailure(throwable);
+                    }
+                }
+            }
+        });
+    }
+
+    @Test
+    void testPropertyDescriptors() throws InitializationException {
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.assertNotValid();
+
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
+        runner.assertValid();
+
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
+        runner.assertNotValid();
+        runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 
millis");
+        runner.assertValid();
+    }
+
+    @Test
+    void testSendOne_Success_FlowFileStrategy() throws InitializationException 
{
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+
+        runner.enqueue("text");
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+        
assertNotNull(flowFile.getAttribute(PubSubAttributes.MESSAGE_ID_ATTRIBUTE));
+    }
+
+    @Test
+    void testSendOne_Retry_FlowFileStrategy() throws InitializationException {
+        throwable = new UnavailableException(null, 
GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
+
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+
+        runner.enqueue("text");
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1);
+    }
+
+
+    @Test
+    void testSendOne_Failure_FlowFileStrategy() throws InitializationException 
{
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");
+        runner.setProperty(PublishGCPubSub.MAX_MESSAGE_SIZE, "16 B");
+        runner.enqueue("some really long text");
+
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
+    }
+
+    @Test
+    void testSendOne_Success_RecordStrategy() throws InitializationException, 
IOException {
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, "my-topic");
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, "my-project");

Review Comment:
   These initial `setProperty()` calls repeat the same values, so they could be 
pulled out to a private method that could be called when needed in each test 
method.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.UnavailableException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
+import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to 
Google PubSub endpoint.
+ */
+public class FlowFileResult {
+    private static final Logger logger = 
LoggerFactory.getLogger(FlowFileResult.class);
+
+    private final FlowFile flowFile;
+    private final Map<String, String> attributes;
+    private final List<ApiFuture<String>> futures;
+    private final List<String> successes;
+    private final List<Throwable> failures;
+
+    public FlowFileResult(final FlowFile flowFile, final 
List<ApiFuture<String>> futures,
+                          final List<String> successes, final List<Throwable> 
failures) {
+        this.flowFile = flowFile;
+        this.attributes = new HashMap<>();
+        this.futures = futures;
+        this.successes = successes;
+        this.failures = failures;
+    }
+
+    /**
+     * After all in-flight messages have results, calculate appropriate {@link 
Relationship}.
+     */
+    public Relationship reconcile() {
+        while (futures.size() > (successes.size() + failures.size())) {
+            try {
+                ApiFutures.allAsList(futures).get();
+            } catch (InterruptedException | ExecutionException e) {
+                logger.error("Error while reconciling PublishGCPubSub send", 
e);

Review Comment:
   Recommend adjusting the wording slightly:
   ```suggestion
                   logger.error("Failed to reconcile PubSub send operation 
status", e);
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -82,12 +101,63 @@
         description = "Attributes to be set for the outgoing Google Cloud 
PubSub message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 @WritesAttributes({
         @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
+        @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description = 
RECORDS_DESCRIPTION),
         @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
 })
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The entirety of the FlowFile's content "
         + "will be read into memory to be sent as a PubSub message.")
 public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
+    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Input Batch Size")
+            .displayName("Input Batch Size")
+            .description("Maximum number of FlowFiles processed for each 
Processor invocation")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .defaultValue("100")
+            .build();
+
+    public static final PropertyDescriptor CONTENT_INPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Content Input Strategy")
+            .displayName("Content Input Strategy")

Review Comment:
   Given the options and behavior, what do you think of naming this property 
`Message Derivation Strategy` or `Message Generation Strategy`? Something that 
indicates the property controls how the Processor creates messages from 
incoming FlowFiles.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.UnavailableException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
+import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to 
Google PubSub endpoint.
+ */
+public class FlowFileResult {
+    private static final Logger logger = 
LoggerFactory.getLogger(FlowFileResult.class);
+
+    private final FlowFile flowFile;
+    private final Map<String, String> attributes;
+    private final List<ApiFuture<String>> futures;
+    private final List<String> successes;
+    private final List<Throwable> failures;
+
+    public FlowFileResult(final FlowFile flowFile, final 
List<ApiFuture<String>> futures,
+                          final List<String> successes, final List<Throwable> 
failures) {
+        this.flowFile = flowFile;
+        this.attributes = new HashMap<>();

Review Comment:
   Recommend using `LinkedHashMap` for more deterministic ordering of 
attributes.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.UnavailableException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
+import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to 
Google PubSub endpoint.
+ */
+public class FlowFileResult {
+    private static final Logger logger = 
LoggerFactory.getLogger(FlowFileResult.class);
+
+    private final FlowFile flowFile;
+    private final Map<String, String> attributes;
+    private final List<ApiFuture<String>> futures;
+    private final List<String> successes;
+    private final List<Throwable> failures;
+
+    public FlowFileResult(final FlowFile flowFile, final 
List<ApiFuture<String>> futures,
+                          final List<String> successes, final List<Throwable> 
failures) {
+        this.flowFile = flowFile;
+        this.attributes = new HashMap<>();
+        this.futures = futures;
+        this.successes = successes;
+        this.failures = failures;
+    }
+
+    /**
+     * After all in-flight messages have results, calculate appropriate {@link 
Relationship}.
+     */
+    public Relationship reconcile() {
+        while (futures.size() > (successes.size() + failures.size())) {
+            try {
+                ApiFutures.allAsList(futures).get();
+            } catch (InterruptedException | ExecutionException e) {
+                logger.error("Error while reconciling PublishGCPubSub send", 
e);
+            }
+        }
+        if (futures.size() == successes.size()) {
+            if (futures.size() == 1) {
+                attributes.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE, 
successes.iterator().next());
+            } else {
+                attributes.put(PubSubAttributes.RECORDS_ATTRIBUTE, 
Integer.toString(futures.size()));
+            }
+        }
+        return RelationshipMapper.toRelationship(failures);
+    }
+
+    public FlowFile getFlowFile() {
+        return flowFile;
+    }
+
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    /**
+     * Logic to derive an appropriate {@link Relationship} from the feedback 
provided by the client library.
+     * <p>
+     * Each {@link com.google.pubsub.v1.PubsubMessage} is associated with a 
{@link NiFiApiFutureCallback} at time of
+     * submission to the client library.  This callback allows the client 
library to convey information to the caller
+     * about the result of the (asynchronous) send.  If a send fails, an 
appropriate exception is conveyed, providing
+     * detail about the send failure; otherwise a message id (provided by the 
service) is supplied.
+     * <p>
+     * Types of exceptions might be classified into "retryable" (another send 
may be attempted) or non-retryable.
+     */
+    private static class RelationshipMapper {
+
+        private static Relationship toRelationship(final List<Throwable> 
failures) {
+            Relationship relationship = PublishGCPubSub.REL_SUCCESS;
+            boolean isRetry = false;
+            boolean isFailure = false;
+            for (final Throwable failure : failures) {
+                if (isRetryException(failure)) {
+                    isRetry = true;
+                } else {
+                    isFailure = true;
+                    break;
+                }
+            }
+            if (isFailure) {
+                relationship = PublishGCPubSub.REL_FAILURE;
+            } else if (isRetry) {
+                relationship = PublishGCPubSub.REL_RETRY;
+            }
+            return relationship;
+        }
+
+        /**
+         * Retryable exceptions indicate transient conditions; another send 
attempt might succeed.
+         */
+        private static final Collection<Class<? extends Throwable>> 
RETRY_EXCEPTIONS = Collections.singleton(

Review Comment:
   This static member should be moved before the `toRelationship` method to 
follow standard ordering conventions.



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java:
##########
@@ -310,14 +468,23 @@ private Map<String, String> 
getDynamicAttributesMap(ProcessContext context, Flow
     }
 
     private Publisher.Builder getPublisherBuilder(ProcessContext context) {
-        final Long batchSize = context.getProperty(BATCH_SIZE).asLong();
+        final Long batchSizeThreshold = 
context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
+        final long batchBytesThreshold = 
context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
+        final Long batchDelayThreshold = 
context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String endpoint = context.getProperty(API_ENDPOINT).getValue();
 
-        return Publisher.newBuilder(getTopicName(context))
+        final Publisher.Builder publisherBuilder = 
Publisher.newBuilder(getTopicName(context))
                 
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
                 .setChannelProvider(getTransportChannelProvider(context))
-                .setBatchingSettings(BatchingSettings.newBuilder()
-                .setElementCountThreshold(batchSize)
+                .setEndpoint(endpoint);
+
+        publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
+                .setElementCountThreshold(batchSizeThreshold)
+                .setRequestByteThreshold(batchBytesThreshold)
+                .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
+                //.setFlowControlSettings(null)

Review Comment:
   It looks like this commented line can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to