This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1cf0f8cf877259e2acedede926a6546a8af69aef Author: Peter Gyori <peter.gyori....@gmail.com> AuthorDate: Tue May 10 18:51:08 2022 +0200 NIFI-10010: ListenTCP adds Certificate Subject and Issuer FlowFile attributes This closes #6032 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- ...ByteArrayMessage.java => SslSessionStatus.java} | 29 ++++++------- .../event/transport/message/ByteArrayMessage.java | 13 +++++- .../netty/codec/SocketByteArrayMessageDecoder.java | 48 +++++++++++++++++++++- .../apache/nifi/processors/standard/ListenTCP.java | 26 +++++++++++- .../nifi/processors/standard/TestListenTCP.java | 19 ++++++--- 5 files changed, 110 insertions(+), 25 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java similarity index 57% copy from nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java copy to nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java index 00744ce06b..011cedb47f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/SslSessionStatus.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,28 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.event.transport.message; -import org.apache.nifi.event.transport.NetworkEvent; +package org.apache.nifi.event.transport; -/** - * Byte Array Message with Sender - */ -public class ByteArrayMessage implements NetworkEvent { - private final byte[] message; +import javax.security.auth.x500.X500Principal; - private final String sender; +public class SslSessionStatus { + private final X500Principal subject; + private final X500Principal issuer; - public ByteArrayMessage(final byte[] message, final String sender) { - this.message = message; - this.sender = sender; + public SslSessionStatus(final X500Principal subject, final X500Principal issuer) { + this.subject = subject; + this.issuer = issuer; } - public byte[] getMessage() { - return message; + public X500Principal getSubject() { + return subject; } - public String getSender() { - return sender; + public X500Principal getIssuer() { + return issuer; } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java index 00744ce06b..9c6e53e42f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java @@ -17,6 +17,7 @@ package org.apache.nifi.event.transport.message; import org.apache.nifi.event.transport.NetworkEvent; +import org.apache.nifi.event.transport.SslSessionStatus; /** * Byte Array Message with Sender @@ -25,10 +26,16 @@ public class ByteArrayMessage implements NetworkEvent { private final byte[] message; private final String sender; + private final SslSessionStatus sslSessionStatus; - public ByteArrayMessage(final byte[] message, final String sender) { + public ByteArrayMessage(final byte[] message, final String sender, final SslSessionStatus sslSessionStatus) { this.message = message; this.sender = sender; + this.sslSessionStatus = sslSessionStatus; + } + + public ByteArrayMessage(final byte[] message, final String sender) { + this(message, sender, null); } public byte[] getMessage() { @@ -38,4 +45,8 @@ public class ByteArrayMessage implements NetworkEvent { public String getSender() { return sender; } + + public SslSessionStatus getSslSessionStatus() { + return sslSessionStatus; + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java index ab23dff80d..c7818c8b00 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/SocketByteArrayMessageDecoder.java @@ -16,17 +16,30 @@ */ package org.apache.nifi.event.transport.netty.codec; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.ssl.SslHandler; +import org.apache.nifi.event.transport.SslSessionStatus; import org.apache.nifi.event.transport.message.ByteArrayMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.security.auth.x500.X500Principal; import java.net.InetSocketAddress; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; import java.util.List; +import java.util.Map; /** * Message Decoder for bytes received from Socket Channels */ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[]> { + private static final Logger logger = LoggerFactory.getLogger(SocketByteArrayMessageDecoder.class); + /** * Decode bytes to Byte Array Message with remote address from Channel.remoteAddress() * @@ -38,7 +51,40 @@ public class SocketByteArrayMessageDecoder extends MessageToMessageDecoder<byte[ protected void decode(final ChannelHandlerContext channelHandlerContext, final byte[] bytes, final List<Object> decoded) { final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress(); final String address = remoteAddress.getHostString(); - final ByteArrayMessage message = new ByteArrayMessage(bytes, address); + + final SslSessionStatus sslSessionStatus = getSslSessionStatus(channelHandlerContext); + final ByteArrayMessage message = new ByteArrayMessage(bytes, address, sslSessionStatus); + decoded.add(message); } + + private SslSessionStatus getSslSessionStatus(final ChannelHandlerContext channelHandlerContext) { + SslHandler sslHandler = null; + for (final Map.Entry<String, ChannelHandler> entry : channelHandlerContext.channel().pipeline()) { + final ChannelHandler channelHandler = entry.getValue(); + if (channelHandler instanceof SslHandler) { + sslHandler = (SslHandler) channelHandler; + break; + } + } + return sslHandler == null ? null : createSslSessionStatusFromSslHandler(sslHandler); + } + + private SslSessionStatus createSslSessionStatusFromSslHandler(final SslHandler sslHandler) { + final SSLSession sslSession = sslHandler.engine().getSession(); + SslSessionStatus sslSessionStatus = null; + try { + final Certificate[] certificates = sslSession.getPeerCertificates(); + if (certificates.length > 0) { + final X509Certificate certificate = (X509Certificate) certificates[0]; + final X500Principal subject = certificate.getSubjectX500Principal(); + final X500Principal issuer = certificate.getIssuerX500Principal(); + sslSessionStatus = new SslSessionStatus(subject, issuer); + } + } catch (final SSLPeerUnverifiedException peerUnverifiedException) { + logger.debug("Peer Unverified", peerUnverifiedException); + } + + return sslSessionStatus; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java index fc7695b37a..b30ce88223 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java @@ -30,6 +30,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.event.transport.EventException; import org.apache.nifi.event.transport.EventServer; +import org.apache.nifi.event.transport.SslSessionStatus; import org.apache.nifi.event.transport.configuration.BufferAllocator; import org.apache.nifi.event.transport.configuration.TransportProtocol; import org.apache.nifi.event.transport.message.ByteArrayMessage; @@ -74,12 +75,24 @@ import java.util.concurrent.LinkedBlockingQueue; "as the message demarcator. The default behavior is for each message to produce a single FlowFile, however this can " + "be controlled by increasing the Batch Size to a larger value for higher throughput. The Receive Buffer Size must be " + "set as large as the largest messages expected to be received, meaning if every 100kb there is a line separator, then " + - "the Receive Buffer Size must be greater than 100kb.") + "the Receive Buffer Size must be greater than 100kb. " + + "The processor can be configured to use an SSL Context Service to only allow secure connections. " + + "When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's " + + "issuer and subject are added to the outgoing FlowFiles as attributes. " + + "The processor does not perform authorization based on Distinguished Name values, but since these values " + + "are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.") @WritesAttributes({ @WritesAttribute(attribute="tcp.sender", description="The sending host of the messages."), - @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received.") + @WritesAttribute(attribute="tcp.port", description="The sending port the messages were received."), + @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the " + + "Certificate Authority that issued the client's certificate " + + "is attached to the FlowFile."), + @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the " + + "client certificate's owner (subject) is attached to the FlowFile.") }) public class ListenTCP extends AbstractProcessor { + private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn"; + private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn"; public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") @@ -213,6 +226,7 @@ public class ListenTCP extends AbstractProcessor { } final Map<String,String> attributes = getAttributes(entry.getValue()); + addClientCertificateAttributes(attributes, events.get(0)); flowFile = session.putAllAttributes(flowFile, attributes); getLogger().debug("Transferring {} to success", flowFile); @@ -291,4 +305,12 @@ public class ListenTCP extends AbstractProcessor { } return eventBatcher; } + + private void addClientCertificateAttributes(final Map<String, String> attributes, final ByteArrayMessage event) { + final SslSessionStatus sslSessionStatus = event.getSslSessionStatus(); + if (sslSessionStatus != null) { + attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, sslSessionStatus.getSubject().getName()); + attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, sslSessionStatus.getIssuer().getName()); + } + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java index 327dc235a1..91549d07a3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java @@ -33,9 +33,9 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.util.ssl.SslContextUtils; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import javax.net.ssl.SSLContext; @@ -45,6 +45,8 @@ import java.util.ArrayList; import java.util.List; public class TestListenTCP { + private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn"; + private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn"; private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName(); private static final String LOCALHOST = "localhost"; @@ -56,13 +58,13 @@ public class TestListenTCP { private TestRunner runner; - @BeforeClass + @BeforeAll public static void configureServices() throws TlsException { keyStoreSslContext = SslContextUtils.createKeyStoreSslContext(); trustStoreSslContext = SslContextUtils.createTrustStoreSslContext(); } - @Before + @BeforeEach public void setup() { runner = TestRunners.newTestRunner(ListenTCP.class); } @@ -122,6 +124,7 @@ public class TestListenTCP { @Test public void testRunClientAuthRequired() throws Exception { + final String expectedDistinguishedName = "CN=localhost"; runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name()); enableSslContextService(keyStoreSslContext); @@ -137,6 +140,10 @@ public class TestListenTCP { List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); for (int i = 0; i < mockFlowFiles.size(); i++) { mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1)); + mockFlowFiles.get(i).assertAttributeExists(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE); + mockFlowFiles.get(i).assertAttributeExists(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE); + mockFlowFiles.get(i).assertAttributeEquals(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, expectedDistinguishedName); + mockFlowFiles.get(i).assertAttributeEquals(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, expectedDistinguishedName); } } @@ -157,6 +164,8 @@ public class TestListenTCP { List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); for (int i = 0; i < mockFlowFiles.size(); i++) { mockFlowFiles.get(i).assertContentEquals("This is message " + (i + 1)); + mockFlowFiles.get(i).assertAttributeNotExists(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE); + mockFlowFiles.get(i).assertAttributeNotExists(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE); } }