greyp9 commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r731958660



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends ByteArrayMessage> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, 
final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped 
together based on a batch key which
+     * by default is the sender of the event, but can be override by 
sub-classes.

Review comment:
       can be overridden ...

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.Mockito.mock;
+
+public class EventBatcherTest {
+
+    static final String MESSAGE_DATA_1 = "some message data";
+    static final String MESSAGE_DATA_2 = "some more data";
+    static Processor processor;
+    static final AtomicLong idGenerator = new AtomicLong(0L);
+    static final ComponentLog logger = mock(ComponentLog.class);
+    static BlockingQueue events;
+    static BlockingQueue errorEvents;
+    static EventBatcher batcher;
+    static MockProcessSession session;
+    static StandardNetworkEventFactory eventFactory;
+
+    @Before
+    public void setUp() {
+        processor = new SimpleProcessor();
+        events = new LinkedBlockingQueue<>();
+        errorEvents = new LinkedBlockingQueue<>();
+        batcher = new EventBatcher<ByteArrayMessage>(logger, events, 
errorEvents) {
+            @Override
+            protected String getBatchKey(ByteArrayMessage event) {
+                return event.getSender();
+            }
+        };
+        session = new MockProcessSession(new SharedSessionState(processor, 
idGenerator), Mockito.mock(Processor.class));
+        eventFactory = new StandardNetworkEventFactory();
+    }
+
+    @Test
+    public void testGetBatches() throws InterruptedException {
+        String sender1 = new 
InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
+        String sender2 = new 
InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
+        final Map<String, String> sender1Metadata = 
EventFactoryUtil.createMapWithSender(sender1.toString());

Review comment:
       toString() shouldn't be needed

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
##########
@@ -85,4 +86,70 @@ public ValidationResult validate(String subject, String 
input, ValidationContext
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port to listen on for communication.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the received data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive messages. 
Adjust this value appropriately based on the expected size of the " +
+                    "incoming messages.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 B")

Review comment:
       Since this value is "off the beaten path", an explanatory comment might 
be nice here.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
##########
@@ -42,6 +43,13 @@ public RELPDecoder(final Charset charset) {
         this(charset, new ByteArrayOutputStream(4096));
     }
 
+    /**
+     * @param charset the charset to decode bytes from the RELP frame
+     */
+    public RELPDecoder(final int bufferSize, final Charset charset) {

Review comment:
       Since this ctor is a net add, it might be nice if Charset was the first 
parameter, to align with the others.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoderTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+class RELPFrameDecoderTest {
+
+    final ComponentLog logger = new 
MockComponentLog(this.getClass().getSimpleName(), this);
+
+    public static final String OPEN_FRAME_DATA = 
"relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";;
+    public static final String SYSLOG_FRAME_DATA = "this is a syslog message 
here";
+
+    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
+            .txnr(1)
+            .command("open")
+            .dataLength(OPEN_FRAME_DATA.length())
+            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
+            .txnr(2)
+            .command("syslog")
+            .dataLength(SYSLOG_FRAME_DATA.length())
+            .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
+            .txnr(3)
+            .command("close")
+            .dataLength(0)
+            .data(new byte[0])
+            .build();
+
+    @Test
+    void testDecodeRELPEvents() throws IOException {
+        final List<RELPFrame> frames = getFrames(5);
+        ByteBufOutputStream eventBytes = new 
ByteBufOutputStream(Unpooled.buffer());
+        sendFrames(frames, eventBytes);
+        EmbeddedChannel channel = new EmbeddedChannel(new 
RELPFrameDecoder(logger, Charset.defaultCharset()));

Review comment:
       The usage of defaultCharset here conflicts with the specification of the 
RELPFrame instances above.  You could generate the frames dynamically, or pick 
a static Charset.
   
   It might be interesting to pick a few charsets from 
Charset.availableCharsets(), and iterate through those.  This is assuming that 
we are trying to handle RELP input from a machine running in the context of a 
non-standard locale.

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
##########
@@ -85,4 +86,70 @@ public ValidationResult validate(String subject, String 
input, ValidationContext
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port to listen on for communication.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the received data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive messages. 
Adjust this value appropriately based on the expected size of the " +
+                    "incoming messages.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 B")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Size of Socket Buffer")
+            .description("The maximum size of the socket buffer that should be 
used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this 
value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to 
buffer messages being transferred from the underlying channel to the processor. 
" +
+                    "Setting this value higher allows more messages to be 
buffered in memory during surges of incoming messages, but increases the total 
" +

Review comment:
       I read this at first as suggesting that the memory was never reclaimed 
when the message surge abated.  LinkedBlockingQueue appears to dynamically 
expand and contract as needed.  Suggest a wording tweak:
   
   ~ "... but increases the memory used by the processor during these surges."
   
   There are a couple of other usages of this wording that should be kept in 
sync.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -157,90 +165,71 @@ public void testRunBatching() throws IOException {
 
     @Test
     public void testRunMutualTls() throws IOException, TlsException, 
InitializationException {
+
+
         final String serviceIdentifier = SSLContextService.class.getName();
-        
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+        when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = 
SslContextUtils.createKeyStoreSslContext();
-        Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+        when(sslContextService.createContext()).thenReturn(sslContext);
         runner.addControllerService(serviceIdentifier, sslContextService);
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
 
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, 
Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
     @Test
     public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        String sender1 = "/192.168.1.50:55000";
+        String sender2 = "/192.168.1.50:55001";
+        String sender3 = "/192.168.1.50:55002";
+
+        final List<RELPMessage> mockEvents = new ArrayList<>();
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
 
         MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, 
Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, 
Integer.toString(NetworkUtils.availablePort()));
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
         runner.shutdown();
     }
 
     private void run(final List<RELPFrame> frames, final int flowFiles, final 
int responses, final SSLContext sslContext)

Review comment:
       is responses parameter no longer needed?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -45,32 +39,47 @@
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
 @RunWith(MockitoJUnitRunner.class)
 public class TestListenRELP {
 
     public static final String OPEN_FRAME_DATA = 
"relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";;
     public static final String SYSLOG_FRAME_DATA = "this is a syslog message 
here";
 
+    private static final String LOCALHOST = "localhost";
+    private static final Charset CHARSET = StandardCharsets.US_ASCII;
+    private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(30);
+
     static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
             .txnr(1)
             .command("open")
             .dataLength(OPEN_FRAME_DATA.length())
-            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .data(OPEN_FRAME_DATA.getBytes(CHARSET))

Review comment:
       This method of initializing the test frames only works with 1 byte per 
character charsets (which may be good enough).

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.Mockito.mock;
+
+public class EventBatcherTest {
+
+    static final String MESSAGE_DATA_1 = "some message data";

Review comment:
       Are these members static because they might be shared among multiple 
potential tests?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
##########
@@ -157,90 +165,71 @@ public void testRunBatching() throws IOException {
 
     @Test
     public void testRunMutualTls() throws IOException, TlsException, 
InitializationException {
+
+
         final String serviceIdentifier = SSLContextService.class.getName();
-        
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+        when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = 
SslContextUtils.createKeyStoreSslContext();
-        Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+        when(sslContextService.createContext()).thenReturn(sslContext);
         runner.addControllerService(serviceIdentifier, sslContextService);
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
         final int syslogFrames = 3;
         final List<RELPFrame> frames = getFrames(syslogFrames);
         run(frames, syslogFrames, syslogFrames, sslContext);
     }
 
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, 
Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
-    }
-
     @Test
     public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
-
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), 
responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        String sender1 = "/192.168.1.50:55000";
+        String sender2 = "/192.168.1.50:55001";
+        String sender3 = "/192.168.1.50:55002";
+
+        final List<RELPMessage> mockEvents = new ArrayList<>();
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender2, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, SYSLOG_FRAME.getData(), 
SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
 
         MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, 
Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, 
Integer.toString(NetworkUtils.availablePort()));
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
         runner.shutdown();
     }
 
     private void run(final List<RELPFrame> frames, final int flowFiles, final 
int responses, final SSLContext sslContext)
             throws IOException {
 
         final int port = NetworkUtils.availablePort();
-        runner.setProperty(ListenRELP.PORT, Integer.toString(port));
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, 
Integer.toString(port));
 
         // Run Processor and start Dispatcher without shutting down
         runner.run(1, false, true);
-
-        try (final Socket socket = getSocket(port, sslContext)) {
-            final OutputStream outputStream = socket.getOutputStream();
-            sendFrames(frames, outputStream);
-
-            // Run Processor for number of responses
-            runner.run(responses, false, false);
-
-            runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
-        } finally {
-            runner.shutdown();
-        }
+        final byte[] relpMessages = getRELPMessages(frames);
+        sendMessages(port, relpMessages, sslContext);
+        runner.run(flowFiles, false, false);

Review comment:
       Maybe you know why runner is primed in line 216?  (not familiar with 
doing that)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to