[ 
https://issues.apache.org/jira/browse/NIFI-4002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033694#comment-16033694
 ] 

ASF GitHub Bot commented on NIFI-4002:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1878#discussion_r119728626
  
    --- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 ---
    @@ -0,0 +1,559 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.elasticsearch;
    +
    +import okhttp3.MediaType;
    +import okhttp3.OkHttpClient;
    +import okhttp3.RequestBody;
    +import okhttp3.Response;
    +import okhttp3.ResponseBody;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.record.path.FieldValue;
    +import org.apache.nifi.record.path.RecordPath;
    +import org.apache.nifi.record.path.util.RecordPathCache;
    +import org.apache.nifi.record.path.validation.RecordPathValidator;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.ArrayDataType;
    +import org.apache.nifi.serialization.record.type.ChoiceDataType;
    +import org.apache.nifi.serialization.record.type.MapDataType;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +import org.apache.nifi.util.StringUtils;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonGenerator;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.node.ArrayNode;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.math.BigInteger;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +
    +import static org.apache.commons.lang3.StringUtils.trimToEmpty;
    +
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@EventDriven
    +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", 
"put", "http", "record"})
    +@CapabilityDescription("Writes the contents of a FlowFile to 
Elasticsearch, using the specified parameters such as "
    +        + "the index to insert into and the type of the document.")
    +public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcessor {
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
    +            .description("All FlowFiles that are written to Elasticsearch 
are routed to this relationship").build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
    +            .description("All FlowFiles that cannot be written to 
Elasticsearch are routed to this relationship").build();
    +
    +    public static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
    +            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +
    +    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
    +            .name("put-db-record-record-reader")
    --- End diff --
    
    Yeah definitely, just a copy-paste thing :)


> Add PutElasticsearchHttpRecord processor
> ----------------------------------------
>
>                 Key: NIFI-4002
>                 URL: https://issues.apache.org/jira/browse/NIFI-4002
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> With the new Record Reader/Writer capabilities, and that the PutElasticsearch 
> processors only handle a single document at a time, it would be nice to have 
> a "record-aware" PutES processor, where the user could specify a Record 
> Reader and for each record in the input, the processor would convert the 
> record to JSON and push to an ES cluster. One necessary (but optional) 
> property would be a Record Path to a field/value to be used as the document 
> identifier (the current PutES processors require an attribute to hold the 
> identifier). If the Record Path is not set and the operation is "index", then 
> just like the PutES processors, the document identifier will be autogenerated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to