[ 
https://issues.apache.org/jira/browse/NIFI-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16403286#comment-16403286
 ] 

ASF GitHub Bot commented on NIFI-4325:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2113#discussion_r175249262
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
 ---
    @@ -0,0 +1,326 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.elasticsearch.ElasticSearchClientService;
    +import org.apache.nifi.elasticsearch.SearchResponse;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
    +    @WritesAttribute(attribute = "aggregation.name", description = "The 
name of the aggregation whose results are in the output flowfile")
    +})
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
    +@CapabilityDescription("A processor that allows the user to run a query 
(with aggregations) written with the " +
    +        "ElasticSearch JSON DSL. It currently does not support 
pagination.")
    +public class JsonQueryElasticsearch extends AbstractProcessor {
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original")
    +            .description("All original flowfiles that don't cause an error 
to occur go to this relationship. " +
    +                    "This applies even if you select the \"split up hits\" 
option to send individual hits to the " +
    +                    "\"hits\" relationship.").build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
    +            .description("All FlowFiles that cannot be read from 
Elasticsearch are routed to this relationship").build();
    +
    +    public static final Relationship REL_HITS = new 
Relationship.Builder().name("hits")
    +            .description("Search hits are routed to this relationship.")
    +            .build();
    +
    +    public static final Relationship REL_AGGREGATIONS = new 
Relationship.Builder().name("aggregations")
    +            .description("Aggregations are routed to this relationship.")
    +            .build();
    +
    +    public static final PropertyDescriptor INDEX = new 
PropertyDescriptor.Builder()
    +            .name("el-rest-fetch-index")
    +            .displayName("Index")
    +            .description("The name of the index to read from")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TYPE = new 
PropertyDescriptor.Builder()
    +            .name("el-rest-type")
    +            .displayName("Type")
    +            .description("The type of this document (used by Elasticsearch 
for indexing and searching)")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
    +            .name("el-rest-query")
    +            .displayName("Query")
    +            .description("A query in JSON syntax, not Lucene syntax. Ex: " 
+
    +                    "{\n" +
    +                    "\t\"query\": {\n" +
    +                    "\t\t\"match\": {\n" +
    +                    "\t\t\t\"name\": \"John Smith\"\n" +
    +                    "\t\t}\n" +
    +                    "\t}\n" +
    +                    "}")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor QUERY_ATTRIBUTE = new 
PropertyDescriptor.Builder()
    +            .name("el-query-attribute")
    +            .displayName("Query Attribute")
    +            .description("If set, the executed query will be set on each 
result flowfile in the specified attribute.")
    +            .expressionLanguageSupported(true)
    +            .addValidator(Validator.VALID)
    +            .required(false)
    +            .build();
    +
    +    public static final AllowableValue SPLIT_UP_YES = new AllowableValue(
    +        "splitUp-yes",
    +        "Yes",
    +        "Split up results."
    +    );
    +    public static final AllowableValue SPLIT_UP_HITS_NO = new 
AllowableValue(
    +        "splitUp-no",
    +        "No",
    +        "Don't split up results."
    +    );
    +
    +    public static final PropertyDescriptor SPLIT_UP_HITS = new 
PropertyDescriptor.Builder()
    +            .name("el-rest-split-up-hits")
    +            .displayName("Split up search results")
    +            .description("Split up search results into one flowfile per 
result.")
    +            .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES)
    +            .defaultValue(SPLIT_UP_HITS_NO.getValue())
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .build();
    +    public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new 
PropertyDescriptor.Builder()
    +            .name("el-rest-split-up-aggregations")
    +            .displayName("Split up aggregation results")
    +            .description("Split up aggregation results into one flowfile 
per result.")
    +            .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES)
    +            .defaultValue(SPLIT_UP_HITS_NO.getValue())
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    public static final PropertyDescriptor CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("el-rest-client-service")
    +            .displayName("Client Service")
    +            .description("An ElasticSearch client service to use for 
running queries.")
    +            .identifiesControllerService(ElasticSearchClientService.class)
    +            .required(true)
    +            .build();
    +
    +    private static final Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +
    +    private ElasticSearchClientService clientService;
    +
    +    static {
    +        final Set<Relationship> _rels = new HashSet<>();
    +        _rels.add(REL_ORIGINAL);
    +        _rels.add(REL_FAILURE);
    +        _rels.add(REL_HITS);
    +        _rels.add(REL_AGGREGATIONS);
    +        relationships = Collections.unmodifiableSet(_rels);
    +
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(QUERY);
    +        descriptors.add(QUERY_ATTRIBUTE);
    +        descriptors.add(INDEX);
    +        descriptors.add(TYPE);
    +        descriptors.add(CLIENT_SERVICE);
    +        descriptors.add(SPLIT_UP_HITS);
    +        descriptors.add(SPLIT_UP_AGGREGATIONS);
    +
    +        propertyDescriptors = Collections.unmodifiableList(descriptors);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
    +    }
    +
    +    @OnUnscheduled
    +    public void onUnscheduled() {
    +        this.clientService = null;
    --- End diff --
    
    This should be changed from "OnUnscheduled"[1] to "OnStopped"[2]. 
"OnUnscheduled" will be run as soon as the user clicks stop. This stops the 
processor from being schedueld again but there could still be lingering tasks 
running (thus leading to null pointer exceptions when it tries to use the 
clientService). "OnStopped" is run once all tasks finish.
    
    [1] 
https://github.com/apache/nifi/blob/master/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
    [2] 
https://github.com/apache/nifi/blob/master/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnStopped.java


> Create a new ElasticSearch processor that supports the JSON DSL
> ---------------------------------------------------------------
>
>                 Key: NIFI-4325
>                 URL: https://issues.apache.org/jira/browse/NIFI-4325
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Mike Thomsen
>            Priority: Minor
>
> The existing ElasticSearch processors use the Lucene-style syntax for 
> querying, not the JSON DSL. A new processor is needed that can take a full 
> JSON query and execute it. It should also support aggregation queries in this 
> syntax. A user needs to be able to take a query as-is from Kibana and drop it 
> into NiFi and have it just run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to