exceptionfactory commented on a change in pull request #5790:
URL: https://github.com/apache/nifi/pull/5790#discussion_r813965750
##########
File path:
nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java
##########
@@ -19,26 +19,57 @@
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class ITListenGRPC {
private static final String HOST = "localhost";
private static final String SOURCE_SYSTEM_UUID = "FAKE_UUID";
+ private static Map<String, String> getTruststoreProperties() {
+ final Map<String, String> props = new HashMap<>();
+ props.put(StandardSSLContextService.TRUSTSTORE.getName(),
"src/test/resources/truststore.jks");
+ props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(),
"passwordpassword");
+ props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+ return props;
+ }
+
+ private static Map<String, String> getKeystoreProperties() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put(StandardSSLContextService.KEYSTORE.getName(),
"src/test/resources/keystore.jks");
+ properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(),
"passwordpassword");
+ properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(),
"JKS");
+ return properties;
+ }
+
+ private static void useSSLContextService(final TestRunner controller,
final Map<String, String> sslProperties) {
+ final SSLContextService service = new StandardSSLContextService();
+ assertDoesNotThrow(() ->
controller.addControllerService("ssl-service", service, sslProperties));
+ controller.enableControllerService(service);
+
+ controller.setProperty(InvokeGRPC.PROP_SSL_CONTEXT_SERVICE,
"ssl-service");
+ }
+
Review comment:
Are these changes necessary?
##########
File path:
nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java
##########
@@ -119,17 +151,67 @@ public void testOutOfSpaceRoundTrip() throws Exception {
}
}
- @Test(expected = io.grpc.StatusRuntimeException.class)
- public void testExceedMaxMessageSize() throws Exception {
+ @Test
+ public void testExceedMaxMessageSize() {
+ assertThrows(StatusRuntimeException.class, () -> {
+ final int randPort = TestGRPCClient.randomPort();
+ final ManagedChannel channel = TestGRPCClient.buildChannel(HOST,
randPort);
+ final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub =
FlowFileServiceGrpc.newBlockingStub(channel);
+
+ final ListenGRPC listenGRPC = new ListenGRPC();
+ final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
+ runner.setProperty(ListenGRPC.PROP_SERVICE_PORT,
String.valueOf(randPort));
+ // set max message size to 1 byte to force exception to be thrown.
+ runner.setProperty(ListenGRPC.PROP_MAX_MESSAGE_SIZE, "1B");
+
+ final ProcessContext processContext = runner.getProcessContext();
+ final ProcessSessionFactory processSessionFactory =
runner.getProcessSessionFactory();
+
+ try {
+ // start the server. The order of the following statements
shouldn't matter, because the
+ // startServer() method waits for a processSessionFactory to
be available to it.
+ listenGRPC.startServer(processContext);
+ listenGRPC.onTrigger(processContext, processSessionFactory);
+
+ final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
+ .putAttributes("FOO", "BAR")
+ .putAttributes(CoreAttributes.UUID.key(),
SOURCE_SYSTEM_UUID)
+ .setContent(ByteString.copyFrom("content".getBytes()))
+ .build();
+ // this should throw a runtime exception
+ final FlowFileReply reply = stub.send(ingestFile);
+ assertThat(reply.getResponseCode(),
equalTo(FlowFileReply.ResponseCode.SUCCESS));
+ assertThat(reply.getBody(), equalTo("FlowFile successfully
received."));
+
+ runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 1);
+ final List<MockFlowFile> successFiles =
runner.getFlowFilesForRelationship(ListenGRPC.REL_SUCCESS);
+ assertThat(successFiles.size(), equalTo(1));
+ final MockFlowFile mockFlowFile = successFiles.get(0);
+ assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
+ assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST),
equalTo("127.0.0.1"));
+
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN),
equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
+
+ } finally {
+ // stop the server
+ listenGRPC.stopServer(processContext);
+ channel.shutdown();
+ }
+ });
+ }
+
+ @Test
+ public void testSecureOneWaySSL() throws Exception {
final int randPort = TestGRPCClient.randomPort();
- final ManagedChannel channel = TestGRPCClient.buildChannel(HOST,
randPort);
+ final Map<String, String> sslProperties = getTruststoreProperties();
+ final ManagedChannel channel = TestGRPCClient.buildChannel(HOST,
randPort, sslProperties);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub =
FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
+ runner.setAllowSynchronousSessionCommits(true);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT,
String.valueOf(randPort));
- // set max message size to 1 byte to force exception to be thrown.
- runner.setProperty(ListenGRPC.PROP_MAX_MESSAGE_SIZE, "1B");
+ runner.setProperty(ListenGRPC.PROP_USE_SECURE, "true");
+ useSSLContextService(runner, getKeystoreProperties());
Review comment:
Is this change necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]