[ 
https://issues.apache.org/jira/browse/NIFI-2615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664266#comment-15664266
 ] 

ASF GitHub Bot commented on NIFI-2615:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1177#discussion_r87829391
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java
 ---
    @@ -0,0 +1,668 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.net.SocketTimeoutException;
    +import java.net.StandardSocketOptions;
    +import java.nio.ByteBuffer;
    +import java.nio.CharBuffer;
    +import java.nio.channels.AlreadyConnectedException;
    +import java.nio.channels.ConnectionPendingException;
    +import java.nio.channels.SocketChannel;
    +import java.nio.channels.UnresolvedAddressException;
    +import java.nio.channels.UnsupportedAddressTypeException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.CharsetDecoder;
    +import java.nio.charset.StandardCharsets;
    +import java.time.Instant;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +@SupportsBatching
    +@SideEffectFree
    +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Connects over TCP to the provided server. When 
receiving data this will writes either the" +
    +        " full receive buffer or messages based on demarcator to the 
content of a FlowFile. ")
    +public class GetTCP extends AbstractProcessor {
    +
    +    private static final Validator ENDPOINT_VALIDATOR = new Validator() {
    +        @Override
    +        public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
    +            if (null == value || value.isEmpty()) {
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject
 + " cannot be empty").build();
    +            }
    +            //The format should be <host>:<port>{,<host>:<port>}
    +            //first split on ,
    +            final String[] hostPortPairs = value.split(",");
    +            boolean validHostPortPairs = true;
    +            String reason = "";
    +            String offendingSubject = subject;
    +
    +            if(0 == hostPortPairs.length){
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject
 + " cannot be empty").build();
    +            }
    +            for (final String hostPortPair : hostPortPairs) {
    +                offendingSubject = hostPortPair;
    +                //split pair
    +                if (hostPortPair.isEmpty()) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint is empty";
    +                    break;
    +                }
    +                if (!hostPortPair.contains(":")) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint pair does not contain valid 
delimiter";
    +                    break;
    +                }
    +                final String[] parts = hostPortPair.split(":");
    +
    +                if (1 == parts.length) {
    +                    validHostPortPairs = false;
    +                    reason = "could not determine the port";
    +                    break;
    +                } else {
    +                    try {
    +                        final int intVal = Integer.parseInt(parts[1]);
    +
    +                        if (intVal <= 0) {
    +                            reason = "not a positive value";
    +                            validHostPortPairs = false;
    +                            break;
    +                        }
    +                    } catch (final NumberFormatException e) {
    +                        reason = "not a valid integer";
    +                        validHostPortPairs = false;
    +                        break;
    +                    }
    +                }
    +                //if we already have a bad pair then exit now.
    +                if (!validHostPortPairs) {
    +                    break;
    +                }
    +            }
    +            return new 
ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build();
    +        }
    +    };
    +
    +    public static final PropertyDescriptor ENDPOINT_LIST = new 
PropertyDescriptor
    +            .Builder().name("Endpoint List")
    +            .description("A comma delimited list of the servers to connect 
to. The format should be " +
    +                    "<server_address>:<port>. Only one server will be 
connected to at a time, the others " +
    +                    "will be used as fail overs.")
    +            .required(true)
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FAILOVER_ENDPOINT = new 
PropertyDescriptor
    +            .Builder().name("Failover Endpoint")
    +            .description("A failover server to connect to if one of the 
main ones is unreachable after the connection " +
    +                    "attempt count. The format should be 
<server_address>:<port>.")
    +            .required(false)
    +            // .defaultValue("")
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor CONNECTION_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Timeout")
    +            .description("The amount of time to wait before timing out 
while creating a connection")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("5 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Attempt Count")
    +            .description("The number of times to try and establish a 
connection, before using a backup host if available." +
    +                    " This same attempt count would be used for a backup 
host as well.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("3")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Number of Messages in Batch")
    +            .description("The number of messages to write to the flow file 
content")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +
    +    public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new 
PropertyDescriptor
    +            .Builder().name("Receive Buffer Size")
    +            .description("The size of the buffer to receive data in")
    +            .required(false)
    +            .defaultValue("2048")
    +            .addValidator(StandardValidators.createLongValidator(1, 2048, 
true))
    +            .build();
    +
    +    public static final PropertyDescriptor KEEP_ALIVE = new 
PropertyDescriptor
    +            .Builder().name("Keep Alive")
    +            .description("This determines if TCP keep alive is used.")
    +            .required(false)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("Success")
    +            .description("The relationship that all sucessful messages 
from the WebSocket will be sent to")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("Failure")
    +            .description("The relationship that all failed messages from 
the WebSocket will be sent to")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +    private final static Charset charset = 
Charset.forName(StandardCharsets.UTF_8.name());
    +    private final Map<String, String> dynamicAttributes = new HashMap<>();
    +    private volatile Set<String> dynamicPropertyNames = new HashSet<>();
    +
    +    private AtomicBoolean connectedToBackup = new AtomicBoolean();
    +
    +
    +    /*
    +    * Will ensure that the list of property descriptors is build only once.
    +    * Will also create a Set of relationships
    +    */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(ENDPOINT_LIST);
    +        _propertyDescriptors.add(FAILOVER_ENDPOINT);
    +        _propertyDescriptors.add(CONNECTION_TIMEOUT);
    +        _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RECEIVE_BUFFER_SIZE);
    +        _propertyDescriptors.add(KEEP_ALIVE);
    +
    +
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    private Map<SocketRecveiverThread, Future> socketToFuture = new 
HashMap<>();
    +    private ExecutorService executorService;
    +    private transient ComponentLog log = getLogger();
    +    private transient String originalServerAddressList;
    +    private transient String backupServer;
    +    private transient int batchSize;
    +
    +    /**
    +     * Bounded queue of messages events from the socket.
    +     */
    +    protected final BlockingQueue<String> socketMessagesReceived = new 
ArrayBlockingQueue<>(256);
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
ProcessException {
    +
    +
    +        final int rcvBufferSize = 
context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
    +        final boolean keepAlive = 
context.getProperty(KEEP_ALIVE).asBoolean();
    +        final int connectionTimeout = 
context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int connectionRetryCount = 
context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
    +        originalServerAddressList = 
context.getProperty(ENDPOINT_LIST).getValue();
    +        final String[] serverAddresses = 
originalServerAddressList.split(",");
    +        backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue();
    +        executorService = 
Executors.newFixedThreadPool(serverAddresses.length);
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                dynamicAttributes.put(descriptor.getName(), 
entry.getValue());
    +
    +            }
    +        }
    --- End diff --
    
    Since dynamic properties are not used anywhere, you might as well remove 
the above loop


> Add support for GetTCP processor
> --------------------------------
>
>                 Key: NIFI-2615
>                 URL: https://issues.apache.org/jira/browse/NIFI-2615
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.0.0, 0.7.0, 0.6.1
>            Reporter: Andrew Psaltis
>            Assignee: Andrew Psaltis
>
> This processor will allow NiFi to connect to a host via TCP, thus acting as 
> the client and consume data. This should provide the following properties:
> * Endpoint -  this should accept a list of addresses in the format of 
> <Address>:<Port> - if a user wants to be able to track the ingestion rate per 
> address then you would want to have one address in this list. However, there 
> are times when multiple endpoints represent a logical entity and the 
> aggregate ingestion rate is representative of it. 
> * Failover Endpoint - An endpoint to fall over to if the list of Endpoints is 
> exhausted and a connection cannot be made to them or it is disconnected and 
> cannot reconnect.
> * Receive Buffer Size -- The size of the TCP receive buffer to use. This does 
> not related to the size of content in the resulting flow file.
> * Keep Alive -- This enables TCP keep Alive
> * Connection Timeout -- How long to wait when trying to establish a connection
> * Batch Size - The number of messages to put into a Flow File. This will be 
> the max number of messages, as there may be cases where the number of 
> messages received over the wire waiting to be emitted as FF content may be 
> less then the desired batch.
> This processor should also support the following:
> 1. If a connection to end endpoint is broken, it should be logged and 
> reconnections to it should be made. Potentially an exponential backoff 
> strategy will be used. The strategy if there is more than one should be 
> documented and potentially exposed as an Attribute.
> 2. When there are multiple instances of this processor in a flow and NiFi is 
> setup in a cluster, this processor needs to ensure that received messages are 
> not dual processed. For example if this processor is configured to point to 
> the endpoint (172.31.32.212:10000) and the data flow is running on more than 
> one node then only one node should be processing data. In essence they should 
> form a group and have similar semantics as a Kafka consumer group does.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to