[ https://issues.apache.org/jira/browse/NIFI-4002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033695#comment-16033695 ]
ASF GitHub Bot commented on NIFI-4002: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1878#discussion_r119728642 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java --- @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http", "record"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " + + "the index to insert into and the type of the document.") +public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("put-db-record-record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder() + .name("put-es-record-id-path") + .displayName("Identifier Record Path") + .description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\", " + + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be " + + "auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.") + .required(false) + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("put-es-record-index") + .displayName("Index") + .description("The name of the index to insert into") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .build(); + + static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("put-es-record-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() + .name("put-es-record-index-op") + .displayName("Index Operation") + .description("The type of the operation used to index (index, update, upsert, delete)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("index") + .build(); + + private static final Set<Relationship> relationships; + private static final List<PropertyDescriptor> propertyDescriptors; + + private volatile RecordPathCache recordPathCache; + + private final JsonFactory factory = new JsonFactory(); + + static { + final Set<Relationship> _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(ES_URL); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(RESPONSE_TIMEOUT); + descriptors.add(RECORD_READER); + descriptors.add(ID_RECORD_PATH); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(INDEX_OP); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext)); + // Since Expression Language is allowed for index operation, we can't guarantee that we can catch + // all invalid configurations, but we should catch them as soon as we can. For example, if the + // Identifier Record Path property is empty, the Index Operation must evaluate to "index". + String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue(); + String indexOp = validationContext.getProperty(INDEX_OP).getValue(); + + if (StringUtils.isEmpty(idPath)) { + switch (indexOp.toLowerCase()) { + case "update": + case "upsert": + case "delete": + case "": + problems.add(new ValidationResult.Builder() + .valid(false) + .subject(INDEX_OP.getDisplayName()) + .explanation("If Identifier Record Path is not set, Index Operation must evaluate to \"index\"") + .build()); + break; + default: + break; + } + } + return problems; + } + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + recordPathCache = new RecordPathCache(10); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + + // Authentication + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue(); + + OkHttpClient okHttpClient = getClient(); + final ComponentLog logger = getLogger(); + + final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); + final URL url; + try { + url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk"); + } catch (MalformedURLException mue) { + // Since we have a URL validator, something has gone very wrong, throw a ProcessException + context.yield(); + throw new ProcessException(mue); + } + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(index)) { + logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); + String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(indexOp)) { + logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + switch (indexOp.toLowerCase()) { + case "index": + case "update": + case "upsert": + case "delete": + break; + default: + logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String id_path = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue(); + final RecordPath recordPath = StringUtils.isEmpty(id_path) ? null : recordPathCache.getCompiled(id_path); + final StringBuilder sb = new StringBuilder(); --- End diff -- Good idea, will update > Add PutElasticsearchHttpRecord processor > ---------------------------------------- > > Key: NIFI-4002 > URL: https://issues.apache.org/jira/browse/NIFI-4002 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > > With the new Record Reader/Writer capabilities, and that the PutElasticsearch > processors only handle a single document at a time, it would be nice to have > a "record-aware" PutES processor, where the user could specify a Record > Reader and for each record in the input, the processor would convert the > record to JSON and push to an ES cluster. One necessary (but optional) > property would be a Record Path to a field/value to be used as the document > identifier (the current PutES processors require an attribute to hold the > identifier). If the Record Path is not set and the operation is "index", then > just like the PutES processors, the document identifier will be autogenerated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)