greyp9 commented on a change in pull request #5398: URL: https://github.com/apache/nifi/pull/5398#discussion_r731958660
########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import org.apache.nifi.event.transport.message.ByteArrayMessage; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.OutputStreamCallback; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public abstract class EventBatcher<E extends ByteArrayMessage> { + + public static final int POLL_TIMEOUT_MS = 20; + + private volatile BlockingQueue<E> events; + private volatile BlockingQueue<E> errorEvents; + private final ComponentLog logger; + + public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) { + this.logger = logger; + this.events = events; + this.errorEvents = errorEvents; + } + + /** + * Batches together up to the batchSize events. Events are grouped together based on a batch key which + * by default is the sender of the event, but can be override by sub-classes. Review comment: can be overridden ... ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import org.apache.nifi.event.transport.message.ByteArrayMessage; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.SharedSessionState; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.Mockito.mock; + +public class EventBatcherTest { + + static final String MESSAGE_DATA_1 = "some message data"; + static final String MESSAGE_DATA_2 = "some more data"; + static Processor processor; + static final AtomicLong idGenerator = new AtomicLong(0L); + static final ComponentLog logger = mock(ComponentLog.class); + static BlockingQueue events; + static BlockingQueue errorEvents; + static EventBatcher batcher; + static MockProcessSession session; + static StandardNetworkEventFactory eventFactory; + + @Before + public void setUp() { + processor = new SimpleProcessor(); + events = new LinkedBlockingQueue<>(); + errorEvents = new LinkedBlockingQueue<>(); + batcher = new EventBatcher<ByteArrayMessage>(logger, events, errorEvents) { + @Override + protected String getBatchKey(ByteArrayMessage event) { + return event.getSender(); + } + }; + session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class)); + eventFactory = new StandardNetworkEventFactory(); + } + + @Test + public void testGetBatches() throws InterruptedException { + String sender1 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString(); + String sender2 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString(); + final Map<String, String> sender1Metadata = EventFactoryUtil.createMapWithSender(sender1.toString()); Review comment: toString() shouldn't be needed ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java ########## @@ -85,4 +86,70 @@ public ValidationResult validate(String subject, String input, ValidationContext .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The port to listen on for communication.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the received data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Receive Buffer Size") + .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " + + "incoming messages.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("65507 B") Review comment: Since this value is "off the beaten path", an explanatory comment might be nice here. ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java ########## @@ -42,6 +43,13 @@ public RELPDecoder(final Charset charset) { this(charset, new ByteArrayOutputStream(4096)); } + /** + * @param charset the charset to decode bytes from the RELP frame + */ + public RELPDecoder(final int bufferSize, final Charset charset) { Review comment: Since this ctor is a net add, it might be nice if Charset was the first parameter, to align with the others. ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoderTest.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.standard.relp.event.RELPMessage; +import org.apache.nifi.util.MockComponentLog; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +class RELPFrameDecoderTest { + + final ComponentLog logger = new MockComponentLog(this.getClass().getSimpleName(), this); + + public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog"; + public static final String SYSLOG_FRAME_DATA = "this is a syslog message here"; + + static final RELPFrame OPEN_FRAME = new RELPFrame.Builder() + .txnr(1) + .command("open") + .dataLength(OPEN_FRAME_DATA.length()) + .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8)) + .build(); + + static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder() + .txnr(2) + .command("syslog") + .dataLength(SYSLOG_FRAME_DATA.length()) + .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8)) + .build(); + + static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder() + .txnr(3) + .command("close") + .dataLength(0) + .data(new byte[0]) + .build(); + + @Test + void testDecodeRELPEvents() throws IOException { + final List<RELPFrame> frames = getFrames(5); + ByteBufOutputStream eventBytes = new ByteBufOutputStream(Unpooled.buffer()); + sendFrames(frames, eventBytes); + EmbeddedChannel channel = new EmbeddedChannel(new RELPFrameDecoder(logger, Charset.defaultCharset())); Review comment: The usage of defaultCharset here conflicts with the specification of the RELPFrame instances above. You could generate the frames dynamically, or pick a static Charset. It might be interesting to pick a few charsets from Charset.availableCharsets(), and iterate through those. This is assuming that we are trying to handle RELP input from a machine running in the context of a non-standard locale. ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java ########## @@ -85,4 +86,70 @@ public ValidationResult validate(String subject, String input, ValidationContext .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The port to listen on for communication.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the received data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Receive Buffer Size") + .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " + + "incoming messages.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("65507 B") + .required(true) + .build(); + public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Socket Buffer") + .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + + "the data can be read, and incoming data will be dropped.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Message Queue") + .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + + "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " + Review comment: I read this at first as suggesting that the memory was never reclaimed when the message surge abated. LinkedBlockingQueue appears to dynamically expand and contract as needed. Suggest a wording tweak: ~ "... but increases the memory used by the processor during these surges." There are a couple of other usages of this wording that should be kept in sync. ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java ########## @@ -157,90 +165,71 @@ public void testRunBatching() throws IOException { @Test public void testRunMutualTls() throws IOException, TlsException, InitializationException { + + final String serviceIdentifier = SSLContextService.class.getName(); - Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier); + when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier); final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext(); - Mockito.when(sslContextService.createContext()).thenReturn(sslContext); + when(sslContextService.createContext()).thenReturn(sslContext); runner.addControllerService(serviceIdentifier, sslContextService); runner.enableControllerService(sslContextService); runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier); + runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name()); final int syslogFrames = 3; final List<RELPFrame> frames = getFrames(syslogFrames); run(frames, syslogFrames, syslogFrames, sslContext); } - @Test - public void testRunNoEventsAvailable() { - MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>()); - runner = TestRunners.newTestRunner(mockListenRELP); - runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort())); - - runner.run(); - runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0); - runner.shutdown(); - } - @Test public void testBatchingWithDifferentSenders() { - final String sender1 = "sender1"; - final String sender2 = "sender2"; - - final List<RELPEvent> mockEvents = new ArrayList<>(); - mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); - mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); - mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); - mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + String sender1 = "/192.168.1.50:55000"; + String sender2 = "/192.168.1.50:55001"; + String sender3 = "/192.168.1.50:55002"; + + final List<RELPMessage> mockEvents = new ArrayList<>(); + mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); MockListenRELP mockListenRELP = new MockListenRELP(mockEvents); runner = TestRunners.newTestRunner(mockListenRELP); - runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort())); - runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10"); + runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort())); + runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10"); runner.run(); - runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2); + runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3); runner.shutdown(); } private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext) Review comment: is responses parameter no longer needed? ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java ########## @@ -45,32 +39,47 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.util.ssl.SslContextUtils; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class TestListenRELP { public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog"; public static final String SYSLOG_FRAME_DATA = "this is a syslog message here"; + private static final String LOCALHOST = "localhost"; + private static final Charset CHARSET = StandardCharsets.US_ASCII; + private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(30); + static final RELPFrame OPEN_FRAME = new RELPFrame.Builder() .txnr(1) .command("open") .dataLength(OPEN_FRAME_DATA.length()) - .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8)) + .data(OPEN_FRAME_DATA.getBytes(CHARSET)) Review comment: This method of initializing the test frames only works with 1 byte per character charsets (which may be good enough). ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import org.apache.nifi.event.transport.message.ByteArrayMessage; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.SharedSessionState; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.Mockito.mock; + +public class EventBatcherTest { + + static final String MESSAGE_DATA_1 = "some message data"; Review comment: Are these members static because they might be shared among multiple potential tests? ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java ########## @@ -157,90 +165,71 @@ public void testRunBatching() throws IOException { @Test public void testRunMutualTls() throws IOException, TlsException, InitializationException { + + final String serviceIdentifier = SSLContextService.class.getName(); - Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier); + when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier); final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext(); - Mockito.when(sslContextService.createContext()).thenReturn(sslContext); + when(sslContextService.createContext()).thenReturn(sslContext); runner.addControllerService(serviceIdentifier, sslContextService); runner.enableControllerService(sslContextService); runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier); + runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name()); final int syslogFrames = 3; final List<RELPFrame> frames = getFrames(syslogFrames); run(frames, syslogFrames, syslogFrames, sslContext); } - @Test - public void testRunNoEventsAvailable() { - MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>()); - runner = TestRunners.newTestRunner(mockListenRELP); - runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort())); - - runner.run(); - runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0); - runner.shutdown(); - } - @Test public void testBatchingWithDifferentSenders() { - final String sender1 = "sender1"; - final String sender2 = "sender2"; - - final List<RELPEvent> mockEvents = new ArrayList<>(); - mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); - mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); - mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); - mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + String sender1 = "/192.168.1.50:55000"; + String sender2 = "/192.168.1.50:55001"; + String sender3 = "/192.168.1.50:55002"; + + final List<RELPMessage> mockEvents = new ArrayList<>(); + mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); + mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); MockListenRELP mockListenRELP = new MockListenRELP(mockEvents); runner = TestRunners.newTestRunner(mockListenRELP); - runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort())); - runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10"); + runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort())); + runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10"); runner.run(); - runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2); + runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3); runner.shutdown(); } private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext) throws IOException { final int port = NetworkUtils.availablePort(); - runner.setProperty(ListenRELP.PORT, Integer.toString(port)); + runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port)); // Run Processor and start Dispatcher without shutting down runner.run(1, false, true); - - try (final Socket socket = getSocket(port, sslContext)) { - final OutputStream outputStream = socket.getOutputStream(); - sendFrames(frames, outputStream); - - // Run Processor for number of responses - runner.run(responses, false, false); - - runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles); - } finally { - runner.shutdown(); - } + final byte[] relpMessages = getRELPMessages(frames); + sendMessages(port, relpMessages, sslContext); + runner.run(flowFiles, false, false); Review comment: Maybe you know why runner is primed in line 216? (not familiar with doing that) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org