[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7274: NIFI-11553 - additional configurability for GCP processors; PublishGC…
exceptionfactory commented on code in PR #7274: URL: https://github.com/apache/nifi/pull/7274#discussion_r1220305463 ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/NiFiApiFutureCallback.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.publish; + +import com.google.api.core.ApiFutureCallback; + +import java.util.List; + +/** + * Specialization of {@link ApiFutureCallback} used to track Google PubSub send results. Failure + * exceptions are captured to facilitate FlowFile routing decisions. + */ +public class NiFiApiFutureCallback implements ApiFutureCallback { Review Comment: Rather than using `NiFi` as the suffix, recommend something more specific to the function, such as `TrackedApiFutureCallback` ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java: ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.UnavailableException; +import com.google.cloud.pubsub.v1.Publisher; +import io.grpc.Status; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.processors.gcp.pubsub.publish.ContentInputStrategy; +import org.apache.nifi.processors.gcp.pubsub.publish.NiFiApiFutureCallback; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.Executor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PublishGCPubSubTest { + +private Throwable throwable; +private Publisher publisherMock; +private TestRunner runner; + +@BeforeEach +void setRunner() { +throwable = null; +publisherMock = mock(Publisher.class); +runner = TestRunners.newTestRunner(new PublishGCPubSub() { +@Override +@OnScheduled +public void onScheduled(ProcessContext context) { +publisher = publisherMock; +} + +@Override +protected void addCallback(ApiFuture apiFuture, ApiFutureCallback callback, Executor executor) { +if (callback instanceof NiFiApiFutureCallback) { +final NiFiApiFutureCallback niFiApiFutureCallback = (NiFiApiFutureCallback) callback; +if (throwable == null) { +
[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7274: NIFI-11553 - additional configurability for GCP processors; PublishGC…
exceptionfactory commented on code in PR #7274: URL: https://github.com/apache/nifi/pull/7274#discussion_r1199202716 ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/ContentInputStrategy.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.publish; + +import org.apache.nifi.components.DescribedValue; + +/** + * Strategy for publishing data to GCP via PublishGCPubSub processor. + */ +public enum ContentInputStrategy implements DescribedValue { +FLOWFILE_ORIENTED("FlowFile Oriented", +"Each incoming FlowFile is sent as a Google Cloud PubSub message"), +RECORD_ORIENTED("FlowFile Record Oriented", Review Comment: Should the display name be `Record Oriented`? ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.lite; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class PublishGCPubSubLiteTest { + +private TestRunner runner; + +@BeforeEach +void setRunner() { +runner = TestRunners.newTestRunner(PublishGCPubSubLite.class); +} + +@Test +void testPropertyDescriptors() throws InitializationException { +runner.assertNotValid(); + +final ControllerService controllerService = new GCPCredentialsControllerService(); +final String controllerServiceId = GCPCredentialsControllerService.class.getSimpleName(); +runner.addControllerService(controllerServiceId, controllerService); +runner.enableControllerService(controllerService); +//runner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, controllerServiceId); Review Comment: It looks like this commented line should be removed. ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java: ## @@ -36,16 +37,47 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor implements VerifiableProcessor { -public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new PropertyDescriptor.Builder() .name("gcp-pubsub-publish-batch-size") -.displayName("Batch Size") +.displayName("Batch Size Threshold") .description("Indicates the number of messages the cloud service should bundle together in a batch. If not set and left empty, only one message " + "will be used in a batch") .required(true) .defaultValue("15") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); +public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new PropertyDescriptor.Builder() +.name("gcp-batch-bytes")