[ https://issues.apache.org/jira/browse/NIFI-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406854#comment-16406854 ]
ASF GitHub Bot commented on NIFI-4035: -------------------------------------- Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175882819 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord; + + +@Tags({"Apache", "Solr", "Put", "Send","Record"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Indexes the Records from a FlowFile into Solr") +@DynamicProperty(name="A Solr request parameter name", value="A Solr request parameter value", + description="These parameters will be passed to Solr on the request") +public class PutSolrRecord extends SolrProcessor { + + public static final PropertyDescriptor UPDATE_PATH = new PropertyDescriptor + .Builder().name("Solr Update Path") + .description("The path in Solr to post the Flowfile Records") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("/update") + .build(); + + public static final PropertyDescriptor FIELDS_TO_INDEX = new PropertyDescriptor + .Builder().name("Fields To Index") + .displayName("Fields To Index") + .description("Comma-separated list of field names to write") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COMMIT_WITHIN = new PropertyDescriptor + .Builder().name("Commit Within") + .description("The number of milliseconds before the given update is committed") + .required(false) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("5000") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The original FlowFile") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed for any reason other than Solr being unreachable") + .build(); + + public static final Relationship REL_CONNECTION_FAILURE = new Relationship.Builder() + .name("connection_failure") + .description("FlowFiles that failed because Solr is unreachable") + .build(); + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("put-solr-record-record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + public static final String COLLECTION_PARAM_NAME = "collection"; + public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin"; + public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+"; + public final ComponentLog logger = getLogger(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(SOLR_TYPE); + descriptors.add(SOLR_LOCATION); + descriptors.add(COLLECTION); + descriptors.add(UPDATE_PATH); + descriptors.add(RECORD_READER); + descriptors.add(FIELDS_TO_INDEX); + descriptors.add(COMMIT_WITHIN); + descriptors.add(JAAS_CLIENT_APP_NAME); + descriptors.add(BASIC_USERNAME); + descriptors.add(BASIC_PASSWORD); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(SOLR_SOCKET_TIMEOUT); + descriptors.add(SOLR_CONNECTION_TIMEOUT); + descriptors.add(SOLR_MAX_CONNECTIONS); + descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST); + descriptors.add(ZK_CLIENT_TIMEOUT); + descriptors.add(ZK_CONNECTION_TIMEOUT); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_CONNECTION_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value to send for the '" + propertyDescriptorName + "' request parameter") + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final AtomicReference<Exception> error = new AtomicReference<>(null); + final AtomicReference<Exception> connectionError = new AtomicReference<>(null); + + final boolean isSolrCloud = SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue()); + final String collection = context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue(); + final Long commitWithin = context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong(); + final String contentStreamPath = context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue(); + final MultiMapSolrParams requestParams = new MultiMapSolrParams(getRequestParams(context, flowFile)); + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final String fieldsToIndex = context.getProperty(FIELDS_TO_INDEX).getValue(); --- End diff -- Sure, I'll make that change > Implement record-based Solr processors > -------------------------------------- > > Key: NIFI-4035 > URL: https://issues.apache.org/jira/browse/NIFI-4035 > Project: Apache NiFi > Issue Type: Improvement > Affects Versions: 1.2.0, 1.3.0 > Reporter: Bryan Bende > Priority: Minor > > Now that we have record readers and writers, we should implement variants of > the existing Solr processors that record-based... > Processors to consider: > * PutSolrRecord - uses a configured record reader to read an incoming flow > file and insert records to Solr > * GetSolrRecord - extracts records from Solr and uses a configured record > writer to write them to a flow file -- This message was sent by Atlassian JIRA (v7.6.3#76005)