This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new be7233d NIFI-8304 Changed TestListenTCPRecord to loop until files
found
be7233d is described below
commit be7233d252e4b06a6787940cc0983fec4e0a77b1
Author: exceptionfactory <[email protected]>
AuthorDate: Tue Mar 16 21:12:52 2021 -0500
NIFI-8304 Changed TestListenTCPRecord to loop until files found
Signed-off-by: Pierre Villard <[email protected]>
This closes #4904.
---
.../processors/standard/TestListenTCPRecord.java | 49 +++++++++++++---------
1 file changed, 30 insertions(+), 19 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
index e2e5c74..325b951 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java
@@ -18,11 +18,10 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.UncheckedIOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Optional;
import javax.net.ssl.SSLContext;
import org.apache.nifi.json.JsonTreeReader;
@@ -36,6 +35,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -45,6 +45,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestListenTCPRecord {
static final String SCHEMA_TEXT = "{\n" +
@@ -64,6 +66,10 @@ public class TestListenTCPRecord {
"{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\",
\"message\" : \"This is a test 3\"}" +
"]";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestListenTCPRecord.class);
+
+ private static final long TEST_TIMEOUT = 30000;
+
private static final String LOCALHOST = "localhost";
private static final String SSL_CONTEXT_IDENTIFIER =
SSLContextService.class.getName();
@@ -113,8 +119,8 @@ public class TestListenTCPRecord {
runner.assertValid();
}
- @Test
- public void testOneRecordPerFlowFile() throws IOException,
InterruptedException {
+ @Test(timeout = TEST_TIMEOUT)
+ public void testRunOneRecordPerFlowFile() throws IOException,
InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
run(3, null);
@@ -130,8 +136,8 @@ public class TestListenTCPRecord {
}
}
- @Test
- public void testMultipleRecordsPerFlowFileLessThanBatchSize() throws
IOException, InterruptedException {
+ @Test(timeout = TEST_TIMEOUT)
+ public void testRunMultipleRecordsPerFlowFileLessThanBatchSize() throws
IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
run(1, null);
@@ -149,8 +155,8 @@ public class TestListenTCPRecord {
Assert.assertTrue(content.contains("This is a test " + 3));
}
- @Test
- public void testTLSClientAuthRequiredAndClientCertProvided() throws
InitializationException, IOException, InterruptedException {
+ @Test(timeout = TEST_TIMEOUT)
+ public void testRunClientAuthRequired() throws InitializationException,
IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH,
ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext);
@@ -166,8 +172,8 @@ public class TestListenTCPRecord {
Assert.assertTrue(content.contains("This is a test " + 3));
}
- @Test
- public void testTLSClientAuthNoneAndClientCertNotProvided() throws
InitializationException, IOException, InterruptedException {
+ @Test(timeout = TEST_TIMEOUT)
+ public void testRunClientAuthNone() throws InitializationException,
IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH,
ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext);
@@ -190,26 +196,31 @@ public class TestListenTCPRecord {
// Run Processor and start listener without shutting down
runner.run(1, false, true);
- final AtomicBoolean completed = new AtomicBoolean(false);
final Thread thread = new Thread(() -> {
try (final Socket socket = getSocket(port, sslContext)) {
final OutputStream outputStream = socket.getOutputStream();
outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
- completed.set(true);
} catch (final IOException e) {
- throw new UncheckedIOException(e);
+ LOGGER.error("Failed Sending Records to Port [{}]", port, e);
}
});
thread.start();
- // Wait for Send Completion
- completed.compareAndSet(true, false);
+ // Run Processor until success leveraging test method timeouts for
failure status
+ int iterations = 0;
+ while (getSuccessCount() < expectedTransferred) {
+ runner.run(1, false, false);
+ iterations++;
+
+ final Optional<LogMessage> firstErrorMessage =
runner.getLogger().getErrorMessages().stream().findFirst();
+ Assert.assertNull(firstErrorMessage.orElse(null));
+ }
+ LOGGER.info("Completed after iterations [{}]", iterations);
+ }
- // Run Processor for expected FlowFiles with an additional run to
ensure completion
- final int iterations = expectedTransferred + 1;
- runner.run(iterations, true, false);
- runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS,
expectedTransferred);
+ private int getSuccessCount() {
+ return
runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
}
private Socket getSocket(final int port, final SSLContext sslContext)
throws IOException {