Github user trixpan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/858#discussion_r76225271
  
    --- Diff: 
nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
 ---
    @@ -0,0 +1,334 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.enrich;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.commons.net.whois.WhoisClient;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"whois", "enrich", "ip"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("A powerful whois query processor primary designed 
to enrich DataFlows with whois based APIs " +
    +        "(e.g. ShadowServer's ASN lookup) but that can be also used to 
perform regular whois lookups.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "enrich.dns.record*.group*", 
description = "The captured fields of the Whois query response for each of the 
records received"),
    +})
    +public class QueryWhois extends AbstractEnrichProcessor {
    +
    +    public static final AllowableValue BEGIN_END = new 
AllowableValue("Begin/End", "Begin/End",
    +            "The evaluated input of each flowfile is enclosed within begin 
and end tags. Each row contains a delimited set of fields");
    +
    +    public static final AllowableValue BULK_NONE = new 
AllowableValue("None", "None",
    +            "Queries are made without any particular dialect");
    +
    +
    +    public static final PropertyDescriptor WHOIS_QUERY_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("WHOIS_QUERY_TYPE")
    +            .displayName("Whois Query Type")
    +            .description("The Whois query type to be used by the processor 
(if used)")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WHOIS_SERVER = new 
PropertyDescriptor.Builder()
    +            .name("WHOIS_SERVER")
    +            .displayName("Whois Server")
    +            .description("The Whois server to be used")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WHOIS_SERVER_PORT = new 
PropertyDescriptor.Builder()
    +            .name("WHOIS_SERVER_PORT")
    +            .displayName("Whois Server Port")
    +            .description("The TCP port of the remote Whois server")
    +            .required(true)
    +            .defaultValue("43")
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WHOIS_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("WHOIS_TIMEOUT")
    +            .displayName("Whois Query Timeout")
    +            .description("The amount of time to wait until considering a 
query as failed")
    +            .required(true)
    +            .defaultValue("1500 ms")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("BATCH_SIZE")
    +            .displayName("Batch Size")
    +            .description("The number of incoming FlowFiles to process in a 
single execution of this processor. ")
    +            .required(true)
    +            .defaultValue("25")
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BULK_PROTOCOL = new 
PropertyDescriptor.Builder()
    +            .name("BULK_PROTOCOL")
    +            .displayName("Bulk Protocol")
    +            .description("The protocol used to perform the bulk query. ")
    +            .required(true)
    +            .defaultValue(BULK_NONE.getValue())
    +            .allowableValues(BEGIN_END, BULK_NONE)
    +            .build();
    +
    +    @Override
    +    public List<ValidationResult> customValidate(ValidationContext 
validationContext) {
    +        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
    +
    +        final String chosenQUERY_PARSER = 
validationContext.getProperty(QUERY_PARSER).getValue();
    +
    +        if (!chosenQUERY_PARSER.equals(NONE.getValue())  &&  
!validationContext.getProperty(QUERY_PARSER_INPUT).isSet() ) {
    +            results.add(new 
ValidationResult.Builder().input("QUERY_PARSER_INPUT")
    +                    .explanation("Split and Regex parsers require a valid 
Regular Expression")
    +                    .valid(false)
    +                    .build());
    +        }
    +
    +
    +        if (validationContext.getProperty(BATCH_SIZE).asInteger() > 1 &&   
!validationContext.getProperty(KEY_GROUP).isSet() )  {
    +            results.add(new ValidationResult.Builder().input("BATCH_SIZE")
    +                    .explanation("When operating in Batching mode, RegEx 
and Split parsers require a Key lookup group")
    +                    .valid(false)
    +                    .build());
    +        }
    +
    +        if ( validationContext.getProperty(BATCH_SIZE).asInteger() > 1  && 
chosenQUERY_PARSER.equals(NONE.getValue())  ) {
    +            results.add(new 
ValidationResult.Builder().input("QUERY_PARSER")
    +                    .explanation("NONE parser does not support batching. 
Configure Batch Size to 1 or use another parser.")
    +                    .valid(false)
    +                    .build());
    +        }
    +
    +        if ( validationContext.getProperty(BATCH_SIZE).asInteger() == 1  
&& 
!validationContext.getProperty(BULK_PROTOCOL).getValue().equals(BULK_NONE.getValue())
 ) {
    +            results.add(new 
ValidationResult.Builder().input("BULK_PROTOCOL")
    +                    .explanation("Bulk protocol requirement requires 
batching. Configure Batch Size to more than 1 or use another protocol.")
    +                    .valid(false)
    +                    .build());
    +        }
    +
    +
    +
    +        return results;
    +    }
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private WhoisClient whoisClient;
    +
    +    static {
    +        List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(QUERY_INPUT);
    +        props.add(WHOIS_QUERY_TYPE);
    +        props.add(WHOIS_SERVER);
    +        props.add(WHOIS_SERVER_PORT);
    +        props.add(WHOIS_TIMEOUT);
    +        props.add(BATCH_SIZE);
    +        props.add(BULK_PROTOCOL);
    +        props.add(QUERY_PARSER);
    +        props.add(QUERY_PARSER_INPUT);
    +        props.add(KEY_GROUP);
    +        propertyDescriptors = Collections.unmodifiableList(props);
    +
    +        Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_FOUND);
    +        rels.add(REL_NOT_FOUND);
    +        relationships = Collections.unmodifiableSet(rels);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        List<FlowFile> flowFiles = session.get(batchSize);
    +
    +        if (flowFiles == null || flowFiles.isEmpty()) {
    +            // 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
    +            context.yield();
    +            return;
    +        }
    +
    +        // Build query
    +        String buildString = "";
    +        final String queryType = 
context.getProperty(WHOIS_QUERY_TYPE).getValue();
    +
    +        // Verify the the protocol mode and craft the "begin" 
pseudo-command, otherwise just the query type
    +        buildString = 
context.getProperty(BULK_PROTOCOL).getValue().equals("Begin/End")  ? 
buildString.concat("begin") : buildString.concat("");
    +
    +        // Append the query type
    +        buildString = context.getProperty(WHOIS_QUERY_TYPE).isSet()  ? 
buildString.concat(" " + queryType + "\n") : buildString.concat("");
    +
    +        // append the values
    +        for (FlowFile flowFile : flowFiles) {
    +            final String evaluatedInput = 
context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
    +            buildString = buildString + evaluatedInput + "\n";
    +        }
    +
    +        // Verify the the protocol mode and craft the "end" 
pseudo-command, otherwise just the query type
    +        buildString = 
context.getProperty(BULK_PROTOCOL).getValue().equals("Begin/End")  ? 
buildString.concat("end") : buildString.concat("");
    --- End diff --
    
    addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to