[ https://issues.apache.org/jira/browse/NIFI-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16250746#comment-16250746 ]
ASF GitHub Bot commented on NIFI-4325: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r150726684 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch5.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type"), + @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) +@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + + "ElasticSearch JSON DSL. It currently does not support pagination.") +public class JsonQueryElasticsearch5 extends AbstractProcessor { + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") + .description("All original flowfiles that don't cause an error to occur go to this relationship. " + + "This applies even if you select the \"split up hits\" option to send individual hits to the " + + "\"hits\" relationship.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_HITS = new Relationship.Builder().name("hits") + .description("Search hits are routed to this relationship.") + .build(); + + public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") + .description("Aggregations are routed to this relationship.") + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("el5-fetch-index") + .displayName("Index") + .description("The name of the index to read from") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("el5-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("el5-query") + .displayName("Query") + .description("A query in JSON syntax, not Lucene syntax. Ex: " + + "{\n" + + "\t\"query\": {\n" + + "\t\t\"match\": {\n" + + "\t\t\t\"name\": \"John Smith\"\n" + + "\t\t}\n" + + "\t}\n" + + "}") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final AllowableValue SPLIT_UP_YES = new AllowableValue( + "splitUp-yes", + "Yes", + "Split up results." + ); + public static final AllowableValue SPLIT_UP_HITS_NO = new AllowableValue( + "splitUp-no", + "No", + "Don't split up results." + ); + + public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder() + .name("el5-split-up-hits") + .displayName("Split up search results") + .description("Split up search results into one flowfile per result.") + .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_HITS_NO.getValue()) + .required(true) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder() + .name("el5-split-up-aggregations") + .displayName("Split up aggregation results") + .description("Split up aggregation results into one flowfile per result.") + .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_HITS_NO.getValue()) + .required(true) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("el5-client-service") + .displayName("Client Service") + .description("An ElasticSearch client service to use for running queries.") + .identifiesControllerService(ElasticSearchClientService.class) + .required(true) + .build(); + + private static final Set<Relationship> relationships; + private static final List<PropertyDescriptor> propertyDescriptors; + + private ElasticSearchClientService clientService; + + static { + final Set<Relationship> _rels = new HashSet<>(); + _rels.add(REL_ORIGINAL); + _rels.add(REL_FAILURE); + _rels.add(REL_HITS); + _rels.add(REL_AGGREGATIONS); + relationships = Collections.unmodifiableSet(_rels); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(QUERY); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CLIENT_SERVICE); + descriptors.add(SPLIT_UP_HITS); + descriptors.add(SPLIT_UP_AGGREGATIONS); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); + } + + @OnUnscheduled + public void onUnscheduled() { + this.clientService = null; + } + + + private ObjectMapper mapper = new ObjectMapper(); + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + + String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); + String type = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); + + Optional<ElasticSearchResponse> resp = clientService.search(query, index, type); + ElasticSearchResponse response = resp.get(); + + List<FlowFile> hitsFlowFiles = handleHits(response.getHits(), context, session, flowFile); + List<FlowFile> aggsFlowFiles = handleAggregations(response.getAggregations(), context, session, flowFile); + + if (hitsFlowFiles.size() > 0) { + session.transfer(hitsFlowFiles, REL_HITS); + for (FlowFile ff : hitsFlowFiles) { + session.getProvenanceReporter().create(ff); --- End diff -- Since these are coming from an external system, I believe the convention is to use receive() instead of create(). Same goes below for aggregations. > Create a new ElasticSearch processor that supports the JSON DSL > --------------------------------------------------------------- > > Key: NIFI-4325 > URL: https://issues.apache.org/jira/browse/NIFI-4325 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Mike Thomsen > Priority: Minor > > The existing ElasticSearch processors use the Lucene-style syntax for > querying, not the JSON DSL. A new processor is needed that can take a full > JSON query and execute it. It should also support aggregation queries in this > syntax. A user needs to be able to take a query as-is from Kibana and drop it > into NiFi and have it just run. -- This message was sent by Atlassian JIRA (v6.4.14#64029)