Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1177#discussion_r87829391 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java --- @@ -0,0 +1,668 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.AlreadyConnectedException; +import java.nio.channels.ConnectionPendingException; +import java.nio.channels.SocketChannel; +import java.nio.channels.UnresolvedAddressException; +import java.nio.channels.UnsupportedAddressTypeException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@SupportsBatching +@SideEffectFree +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Connects over TCP to the provided server. When receiving data this will writes either the" + + " full receive buffer or messages based on demarcator to the content of a FlowFile. ") +public class GetTCP extends AbstractProcessor { + + private static final Validator ENDPOINT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + if (null == value || value.isEmpty()) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject + " cannot be empty").build(); + } + //The format should be <host>:<port>{,<host>:<port>} + //first split on , + final String[] hostPortPairs = value.split(","); + boolean validHostPortPairs = true; + String reason = ""; + String offendingSubject = subject; + + if(0 == hostPortPairs.length){ + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject + " cannot be empty").build(); + } + for (final String hostPortPair : hostPortPairs) { + offendingSubject = hostPortPair; + //split pair + if (hostPortPair.isEmpty()) { + validHostPortPairs = false; + reason = "endpoint is empty"; + break; + } + if (!hostPortPair.contains(":")) { + validHostPortPairs = false; + reason = "endpoint pair does not contain valid delimiter"; + break; + } + final String[] parts = hostPortPair.split(":"); + + if (1 == parts.length) { + validHostPortPairs = false; + reason = "could not determine the port"; + break; + } else { + try { + final int intVal = Integer.parseInt(parts[1]); + + if (intVal <= 0) { + reason = "not a positive value"; + validHostPortPairs = false; + break; + } + } catch (final NumberFormatException e) { + reason = "not a valid integer"; + validHostPortPairs = false; + break; + } + } + //if we already have a bad pair then exit now. + if (!validHostPortPairs) { + break; + } + } + return new ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build(); + } + }; + + public static final PropertyDescriptor ENDPOINT_LIST = new PropertyDescriptor + .Builder().name("Endpoint List") + .description("A comma delimited list of the servers to connect to. The format should be " + + "<server_address>:<port>. Only one server will be connected to at a time, the others " + + "will be used as fail overs.") + .required(true) + .addValidator(ENDPOINT_VALIDATOR) + .build(); + + public static final PropertyDescriptor FAILOVER_ENDPOINT = new PropertyDescriptor + .Builder().name("Failover Endpoint") + .description("A failover server to connect to if one of the main ones is unreachable after the connection " + + "attempt count. The format should be <server_address>:<port>.") + .required(false) + // .defaultValue("") + .addValidator(ENDPOINT_VALIDATOR) + .build(); + + + public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Connection Timeout") + .description("The amount of time to wait before timing out while creating a connection") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 sec") + .build(); + + public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new PropertyDescriptor.Builder() + .name("Connection Attempt Count") + .description("The number of times to try and establish a connection, before using a backup host if available." + + " This same attempt count would be used for a backup host as well.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("3") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Number of Messages in Batch") + .description("The number of messages to write to the flow file content") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + + + public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new PropertyDescriptor + .Builder().name("Receive Buffer Size") + .description("The size of the buffer to receive data in") + .required(false) + .defaultValue("2048") + .addValidator(StandardValidators.createLongValidator(1, 2048, true)) + .build(); + + public static final PropertyDescriptor KEEP_ALIVE = new PropertyDescriptor + .Builder().name("Keep Alive") + .description("This determines if TCP keep alive is used.") + .required(false) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("Success") + .description("The relationship that all sucessful messages from the WebSocket will be sent to") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("Failure") + .description("The relationship that all failed messages from the WebSocket will be sent to") + .build(); + + private final static List<PropertyDescriptor> propertyDescriptors; + private final static Set<Relationship> relationships; + private final static Charset charset = Charset.forName(StandardCharsets.UTF_8.name()); + private final Map<String, String> dynamicAttributes = new HashMap<>(); + private volatile Set<String> dynamicPropertyNames = new HashSet<>(); + + private AtomicBoolean connectedToBackup = new AtomicBoolean(); + + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(ENDPOINT_LIST); + _propertyDescriptors.add(FAILOVER_ENDPOINT); + _propertyDescriptors.add(CONNECTION_TIMEOUT); + _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT); + _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(RECEIVE_BUFFER_SIZE); + _propertyDescriptors.add(KEEP_ALIVE); + + + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + private Map<SocketRecveiverThread, Future> socketToFuture = new HashMap<>(); + private ExecutorService executorService; + private transient ComponentLog log = getLogger(); + private transient String originalServerAddressList; + private transient String backupServer; + private transient int batchSize; + + /** + * Bounded queue of messages events from the socket. + */ + protected final BlockingQueue<String> socketMessagesReceived = new ArrayBlockingQueue<>(256); + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + + @OnScheduled + public void onScheduled(final ProcessContext context) throws ProcessException { + + + final int rcvBufferSize = context.getProperty(RECEIVE_BUFFER_SIZE).asInteger(); + final boolean keepAlive = context.getProperty(KEEP_ALIVE).asBoolean(); + final int connectionTimeout = context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final int connectionRetryCount = context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger(); + originalServerAddressList = context.getProperty(ENDPOINT_LIST).getValue(); + final String[] serverAddresses = originalServerAddressList.split(","); + backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue(); + executorService = Executors.newFixedThreadPool(serverAddresses.length); + batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + dynamicAttributes.put(descriptor.getName(), entry.getValue()); + + } + } --- End diff -- Since dynamic properties are not used anywhere, you might as well remove the above loop
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---