[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7274: NIFI-11553 - additional configurability for GCP processors; PublishGC…

2023-06-06 Thread via GitHub


exceptionfactory commented on code in PR #7274:
URL: https://github.com/apache/nifi/pull/7274#discussion_r1220305463


##
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/NiFiApiFutureCallback.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import com.google.api.core.ApiFutureCallback;
+
+import java.util.List;
+
+/**
+ * Specialization of {@link ApiFutureCallback} used to track Google PubSub 
send results.  Failure
+ * exceptions are captured to facilitate FlowFile routing decisions.
+ */
+public class NiFiApiFutureCallback implements ApiFutureCallback {

Review Comment:
   Rather than using `NiFi` as the suffix, recommend something more specific to 
the function, such as `TrackedApiFutureCallback`



##
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.cloud.pubsub.v1.Publisher;
+import io.grpc.Status;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.processors.gcp.pubsub.publish.ContentInputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.NiFiApiFutureCallback;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishGCPubSubTest {
+
+private Throwable throwable;
+private Publisher publisherMock;
+private TestRunner runner;
+
+@BeforeEach
+void setRunner() {
+throwable = null;
+publisherMock = mock(Publisher.class);
+runner = TestRunners.newTestRunner(new PublishGCPubSub() {
+@Override
+@OnScheduled
+public void onScheduled(ProcessContext context) {
+publisher = publisherMock;
+}
+
+@Override
+protected void addCallback(ApiFuture apiFuture, 
ApiFutureCallback callback, Executor executor) {
+if (callback instanceof NiFiApiFutureCallback) {
+final NiFiApiFutureCallback niFiApiFutureCallback = 
(NiFiApiFutureCallback) callback;
+if (throwable == null) {
+

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7274: NIFI-11553 - additional configurability for GCP processors; PublishGC…

2023-05-25 Thread via GitHub


exceptionfactory commented on code in PR #7274:
URL: https://github.com/apache/nifi/pull/7274#discussion_r1199202716


##
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/ContentInputStrategy.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.publish;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Strategy for publishing data to GCP via PublishGCPubSub 
processor.
+ */
+public enum ContentInputStrategy implements DescribedValue {
+FLOWFILE_ORIENTED("FlowFile Oriented",
+"Each incoming FlowFile is sent as a Google Cloud PubSub message"),
+RECORD_ORIENTED("FlowFile Record Oriented",

Review Comment:
   Should the display name be `Record Oriented`?



##
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.lite;
+
+import org.apache.nifi.controller.ControllerService;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class PublishGCPubSubLiteTest {
+
+private TestRunner runner;
+
+@BeforeEach
+void setRunner() {
+runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
+}
+
+@Test
+void testPropertyDescriptors() throws InitializationException {
+runner.assertNotValid();
+
+final ControllerService controllerService = new 
GCPCredentialsControllerService();
+final String controllerServiceId = 
GCPCredentialsControllerService.class.getSimpleName();
+runner.addControllerService(controllerServiceId, controllerService);
+runner.enableControllerService(controllerService);
+//runner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, 
controllerServiceId);

Review Comment:
   It looks like this commented line should be removed.



##
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java:
##
@@ -36,16 +37,47 @@
 
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
-public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
 .name("gcp-pubsub-publish-batch-size")
-.displayName("Batch Size")
+.displayName("Batch Size Threshold")
 .description("Indicates the number of messages the cloud service 
should bundle together in a batch. If not set and left empty, only one message 
" +
 "will be used in a batch")
 .required(true)
 .defaultValue("15")
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 .build();
 
+public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new 
PropertyDescriptor.Builder()
+.name("gcp-batch-bytes")