This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new be7233d  NIFI-8304 Changed TestListenTCPRecord to loop until files 
found
be7233d is described below

commit be7233d252e4b06a6787940cc0983fec4e0a77b1
Author: exceptionfactory <[email protected]>
AuthorDate: Tue Mar 16 21:12:52 2021 -0500

    NIFI-8304 Changed TestListenTCPRecord to loop until files found
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4904.
---
 .../processors/standard/TestListenTCPRecord.java   | 49 +++++++++++++---------
 1 file changed, 30 insertions(+), 19 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
index e2e5c74..325b951 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
@@ -18,11 +18,10 @@ package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.UncheckedIOException;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Optional;
 import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.json.JsonTreeReader;
@@ -36,6 +35,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.LogMessage;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -45,6 +45,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestListenTCPRecord {
     static final String SCHEMA_TEXT = "{\n" +
@@ -64,6 +66,10 @@ public class TestListenTCPRecord {
             "{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", 
\"message\" : \"This is a test 3\"}" +
             "]";
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestListenTCPRecord.class);
+
+    private static final long TEST_TIMEOUT = 30000;
+
     private static final String LOCALHOST = "localhost";
 
     private static final String SSL_CONTEXT_IDENTIFIER = 
SSLContextService.class.getName();
@@ -113,8 +119,8 @@ public class TestListenTCPRecord {
         runner.assertValid();
     }
 
-    @Test
-    public void testOneRecordPerFlowFile() throws IOException, 
InterruptedException {
+    @Test(timeout = TEST_TIMEOUT)
+    public void testRunOneRecordPerFlowFile() throws IOException, 
InterruptedException {
         runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
 
         run(3, null);
@@ -130,8 +136,8 @@ public class TestListenTCPRecord {
         }
     }
 
-    @Test
-    public void testMultipleRecordsPerFlowFileLessThanBatchSize() throws 
IOException, InterruptedException {
+    @Test(timeout = TEST_TIMEOUT)
+    public void testRunMultipleRecordsPerFlowFileLessThanBatchSize() throws 
IOException, InterruptedException {
         runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
 
         run(1, null);
@@ -149,8 +155,8 @@ public class TestListenTCPRecord {
         Assert.assertTrue(content.contains("This is a test " + 3));
     }
 
-    @Test
-    public void testTLSClientAuthRequiredAndClientCertProvided() throws 
InitializationException, IOException, InterruptedException {
+    @Test(timeout = TEST_TIMEOUT)
+    public void testRunClientAuthRequired() throws InitializationException, 
IOException, InterruptedException {
         runner.setProperty(ListenTCPRecord.CLIENT_AUTH, 
ClientAuth.REQUIRED.name());
         enableSslContextService(keyStoreSslContext);
 
@@ -166,8 +172,8 @@ public class TestListenTCPRecord {
         Assert.assertTrue(content.contains("This is a test " + 3));
     }
 
-    @Test
-    public void testTLSClientAuthNoneAndClientCertNotProvided() throws 
InitializationException, IOException, InterruptedException {
+    @Test(timeout = TEST_TIMEOUT)
+    public void testRunClientAuthNone() throws InitializationException, 
IOException, InterruptedException {
         runner.setProperty(ListenTCPRecord.CLIENT_AUTH, 
ClientAuth.NONE.name());
         enableSslContextService(keyStoreSslContext);
 
@@ -190,26 +196,31 @@ public class TestListenTCPRecord {
         // Run Processor and start listener without shutting down
         runner.run(1, false, true);
 
-        final AtomicBoolean completed = new AtomicBoolean(false);
         final Thread thread = new Thread(() -> {
             try (final Socket socket = getSocket(port, sslContext)) {
                 final OutputStream outputStream = socket.getOutputStream();
                 outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
                 outputStream.flush();
-                completed.set(true);
             } catch (final IOException e) {
-                throw new UncheckedIOException(e);
+                LOGGER.error("Failed Sending Records to Port [{}]", port, e);
             }
         });
         thread.start();
 
-        // Wait for Send Completion
-        completed.compareAndSet(true, false);
+        // Run Processor until success leveraging test method timeouts for 
failure status
+        int iterations = 0;
+        while (getSuccessCount() < expectedTransferred) {
+            runner.run(1, false, false);
+            iterations++;
+
+            final Optional<LogMessage> firstErrorMessage = 
runner.getLogger().getErrorMessages().stream().findFirst();
+            Assert.assertNull(firstErrorMessage.orElse(null));
+        }
+        LOGGER.info("Completed after iterations [{}]", iterations);
+    }
 
-        // Run Processor for expected FlowFiles with an additional run to 
ensure completion
-        final int iterations = expectedTransferred + 1;
-        runner.run(iterations, true, false);
-        runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, 
expectedTransferred);
+    private int getSuccessCount() {
+        return 
runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
     }
 
     private Socket getSocket(final int port, final SSLContext sslContext) 
throws IOException {

Reply via email to