[ https://issues.apache.org/jira/browse/NIFI-2615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664266#comment-15664266 ]
ASF GitHub Bot commented on NIFI-2615: -------------------------------------- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1177#discussion_r87829391 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java --- @@ -0,0 +1,668 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.AlreadyConnectedException; +import java.nio.channels.ConnectionPendingException; +import java.nio.channels.SocketChannel; +import java.nio.channels.UnresolvedAddressException; +import java.nio.channels.UnsupportedAddressTypeException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@SupportsBatching +@SideEffectFree +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Connects over TCP to the provided server. When receiving data this will writes either the" + + " full receive buffer or messages based on demarcator to the content of a FlowFile. ") +public class GetTCP extends AbstractProcessor { + + private static final Validator ENDPOINT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + if (null == value || value.isEmpty()) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject + " cannot be empty").build(); + } + //The format should be <host>:<port>{,<host>:<port>} + //first split on , + final String[] hostPortPairs = value.split(","); + boolean validHostPortPairs = true; + String reason = ""; + String offendingSubject = subject; + + if(0 == hostPortPairs.length){ + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject + " cannot be empty").build(); + } + for (final String hostPortPair : hostPortPairs) { + offendingSubject = hostPortPair; + //split pair + if (hostPortPair.isEmpty()) { + validHostPortPairs = false; + reason = "endpoint is empty"; + break; + } + if (!hostPortPair.contains(":")) { + validHostPortPairs = false; + reason = "endpoint pair does not contain valid delimiter"; + break; + } + final String[] parts = hostPortPair.split(":"); + + if (1 == parts.length) { + validHostPortPairs = false; + reason = "could not determine the port"; + break; + } else { + try { + final int intVal = Integer.parseInt(parts[1]); + + if (intVal <= 0) { + reason = "not a positive value"; + validHostPortPairs = false; + break; + } + } catch (final NumberFormatException e) { + reason = "not a valid integer"; + validHostPortPairs = false; + break; + } + } + //if we already have a bad pair then exit now. + if (!validHostPortPairs) { + break; + } + } + return new ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build(); + } + }; + + public static final PropertyDescriptor ENDPOINT_LIST = new PropertyDescriptor + .Builder().name("Endpoint List") + .description("A comma delimited list of the servers to connect to. The format should be " + + "<server_address>:<port>. Only one server will be connected to at a time, the others " + + "will be used as fail overs.") + .required(true) + .addValidator(ENDPOINT_VALIDATOR) + .build(); + + public static final PropertyDescriptor FAILOVER_ENDPOINT = new PropertyDescriptor + .Builder().name("Failover Endpoint") + .description("A failover server to connect to if one of the main ones is unreachable after the connection " + + "attempt count. The format should be <server_address>:<port>.") + .required(false) + // .defaultValue("") + .addValidator(ENDPOINT_VALIDATOR) + .build(); + + + public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Connection Timeout") + .description("The amount of time to wait before timing out while creating a connection") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 sec") + .build(); + + public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new PropertyDescriptor.Builder() + .name("Connection Attempt Count") + .description("The number of times to try and establish a connection, before using a backup host if available." + + " This same attempt count would be used for a backup host as well.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("3") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Number of Messages in Batch") + .description("The number of messages to write to the flow file content") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + + + public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new PropertyDescriptor + .Builder().name("Receive Buffer Size") + .description("The size of the buffer to receive data in") + .required(false) + .defaultValue("2048") + .addValidator(StandardValidators.createLongValidator(1, 2048, true)) + .build(); + + public static final PropertyDescriptor KEEP_ALIVE = new PropertyDescriptor + .Builder().name("Keep Alive") + .description("This determines if TCP keep alive is used.") + .required(false) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("Success") + .description("The relationship that all sucessful messages from the WebSocket will be sent to") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("Failure") + .description("The relationship that all failed messages from the WebSocket will be sent to") + .build(); + + private final static List<PropertyDescriptor> propertyDescriptors; + private final static Set<Relationship> relationships; + private final static Charset charset = Charset.forName(StandardCharsets.UTF_8.name()); + private final Map<String, String> dynamicAttributes = new HashMap<>(); + private volatile Set<String> dynamicPropertyNames = new HashSet<>(); + + private AtomicBoolean connectedToBackup = new AtomicBoolean(); + + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(ENDPOINT_LIST); + _propertyDescriptors.add(FAILOVER_ENDPOINT); + _propertyDescriptors.add(CONNECTION_TIMEOUT); + _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT); + _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(RECEIVE_BUFFER_SIZE); + _propertyDescriptors.add(KEEP_ALIVE); + + + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + private Map<SocketRecveiverThread, Future> socketToFuture = new HashMap<>(); + private ExecutorService executorService; + private transient ComponentLog log = getLogger(); + private transient String originalServerAddressList; + private transient String backupServer; + private transient int batchSize; + + /** + * Bounded queue of messages events from the socket. + */ + protected final BlockingQueue<String> socketMessagesReceived = new ArrayBlockingQueue<>(256); + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + + @OnScheduled + public void onScheduled(final ProcessContext context) throws ProcessException { + + + final int rcvBufferSize = context.getProperty(RECEIVE_BUFFER_SIZE).asInteger(); + final boolean keepAlive = context.getProperty(KEEP_ALIVE).asBoolean(); + final int connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final int connectionRetryCount = context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger(); + originalServerAddressList = context.getProperty(ENDPOINT_LIST).getValue(); + final String[] serverAddresses = originalServerAddressList.split(","); + backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue(); + executorService = Executors.newFixedThreadPool(serverAddresses.length); + batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + dynamicAttributes.put(descriptor.getName(), entry.getValue()); + + } + } --- End diff -- Since dynamic properties are not used anywhere, you might as well remove the above loop > Add support for GetTCP processor > -------------------------------- > > Key: NIFI-2615 > URL: https://issues.apache.org/jira/browse/NIFI-2615 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Affects Versions: 1.0.0, 0.7.0, 0.6.1 > Reporter: Andrew Psaltis > Assignee: Andrew Psaltis > > This processor will allow NiFi to connect to a host via TCP, thus acting as > the client and consume data. This should provide the following properties: > * Endpoint - this should accept a list of addresses in the format of > <Address>:<Port> - if a user wants to be able to track the ingestion rate per > address then you would want to have one address in this list. However, there > are times when multiple endpoints represent a logical entity and the > aggregate ingestion rate is representative of it. > * Failover Endpoint - An endpoint to fall over to if the list of Endpoints is > exhausted and a connection cannot be made to them or it is disconnected and > cannot reconnect. > * Receive Buffer Size -- The size of the TCP receive buffer to use. This does > not related to the size of content in the resulting flow file. > * Keep Alive -- This enables TCP keep Alive > * Connection Timeout -- How long to wait when trying to establish a connection > * Batch Size - The number of messages to put into a Flow File. This will be > the max number of messages, as there may be cases where the number of > messages received over the wire waiting to be emitted as FF content may be > less then the desired batch. > This processor should also support the following: > 1. If a connection to end endpoint is broken, it should be logged and > reconnections to it should be made. Potentially an exponential backoff > strategy will be used. The strategy if there is more than one should be > documented and potentially exposed as an Attribute. > 2. When there are multiple instances of this processor in a flow and NiFi is > setup in a cluster, this processor needs to ensure that received messages are > not dual processed. For example if this processor is configured to point to > the endpoint (172.31.32.212:10000) and the data flow is running on more than > one node then only one node should be processing data. In essence they should > form a group and have similar semantics as a Kafka consumer group does. -- This message was sent by Atlassian JIRA (v6.3.4#6332)