exceptionfactory commented on a change in pull request #5044:
URL: https://github.com/apache/nifi/pull/5044#discussion_r632534325



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.EventSenderFactory;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import 
org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
+import 
org.apache.nifi.event.transport.netty.channel.ssl.ClientSslStandardChannelInitializer;
+import 
org.apache.nifi.event.transport.netty.channel.StandardChannelInitializer;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * Netty Event Sender Factory
+ */
+public class NettyEventSenderFactory<T> extends EventLoopGroupFactory 
implements EventSenderFactory<T> {
+    private static final int MAX_PENDING_ACQUIRES = 1024;
+
+    private Duration timeout = Duration.ofSeconds(30);
+
+    private int maxConnections = Runtime.getRuntime().availableProcessors() * 
2;
+
+    private Supplier<List<ChannelHandler>> handlerSupplier = () -> 
Collections.emptyList();
+
+    private final String address;
+
+    private final int port;
+
+    private final TransportProtocol protocol;
+
+    private SSLContext sslContext;
+
+    public NettyEventSenderFactory(final String address, final int port, final 
TransportProtocol protocol) {
+        this.address = address;
+        this.port = port;
+        this.protocol = protocol;
+    }
+
+    /**
+     * Set Channel Handler Supplier
+     *
+     * @param handlerSupplier Channel Handler Supplier
+     */
+    public void setHandlerSupplier(final Supplier<List<ChannelHandler>> 
handlerSupplier) {
+        this.handlerSupplier = Objects.requireNonNull(handlerSupplier);
+    }
+
+    /**
+     * Set SSL Context to enable TLS Channel Handler
+     *
+     * @param sslContext SSL Context
+     */
+    public void setSslContext(final SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    /**
+     * Set Timeout for Connections and Communication
+     *
+     * @param timeout Timeout Duration
+     */
+    public void setTimeout(final Duration timeout) {
+        this.timeout = Objects.requireNonNull(timeout, "Timeout required");
+    }
+
+    /**
+     * Set Maximum Connections for Channel Pool
+     *
+     * @param maxConnections Maximum Number of connections defaults to 
available processors multiplied by 2
+     */
+    public void setMaxConnections(final int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    /**
+     * Get Event Sender with connected Channel
+     *
+     * @return Connected Event Sender
+     */
+    public EventSender<T> getEventSender() {
+        final Bootstrap bootstrap = new Bootstrap();
+        bootstrap.remoteAddress(new InetSocketAddress(address, port));
+        final EventLoopGroup group = getEventLoopGroup();
+        bootstrap.group(group);
+
+        if (TransportProtocol.UDP.equals(protocol)) {
+            bootstrap.channel(NioDatagramChannel.class);
+        } else {
+            bootstrap.channel(NioSocketChannel.class);
+        }
+
+        setChannelOptions(bootstrap);
+        return getConfiguredEventSender(bootstrap);
+    }
+
+    private void setChannelOptions(final Bootstrap bootstrap) {
+        final int timeoutMilliseconds = (int) timeout.toMillis();
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
timeoutMilliseconds);
+    }
+
+    private EventSender<T> getConfiguredEventSender(final Bootstrap bootstrap) 
{
+        final SocketAddress remoteAddress = bootstrap.config().remoteAddress();
+        final ChannelPool channelPool = getChannelPool(bootstrap);
+        return new NettyEventSender<>(bootstrap.config().group(), channelPool, 
remoteAddress);
+    }
+
+    private ChannelPool getChannelPool(final Bootstrap bootstrap) {
+        final ChannelInitializer<Channel> channelInitializer = 
getChannelInitializer();
+        final ChannelPoolHandler handler = new 
InitializingChannelPoolHandler(channelInitializer);
+        return new FixedChannelPool(bootstrap,
+                handler,
+                ChannelHealthChecker.ACTIVE,
+                FixedChannelPool.AcquireTimeoutAction.FAIL,
+                timeout.toMillis(),
+                maxConnections,
+                MAX_PENDING_ACQUIRES);
+    }
+
+    private ChannelInitializer<Channel> getChannelInitializer() {
+        StandardChannelInitializer<Channel> channelInitializer;
+        if (sslContext == null) {

Review comment:
       Sounds good, will update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to