[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 @bbende @MikeThomsen Thanks for reviewing the pull request ---
[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 I'm really sorry, it might take a while, I'm on a vacation and away from my workstation. I'll keep you updated as soon as I am back. ---
[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 Hi, I've looked at the comments and I've made the following changes as part of the latest commit that cover all the comments : 1. Fixed the issue with Nested Records (The issue came up because of the change in field names in the previous commit) 2. Fixed the issue with Array of Records (It was generating an Object[] as opposed to a Record[] that I was expecting and as a result was storing the string representation of a Record) 3. Trimming field names individually 4. Adding Test cases for Nested Record, Array of Record and Record Parser failure 5. Using the getLogger() later in the code 6. Wrapping the Jsons in the additionalDetails.html in a tag I hope the processor now works as expected, let me know if any further changes are to be made Thanks ---
[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 @bbende I'll have a look at this and write test cases accordingly. ---
[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 Hi @bbende , I've brought it down to a single commit, can you have a look at it now? ---
[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 I'm done with the changes that @bbende and @MikeThomsen have suggested ---
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r176208789 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord; + + +@Tags({"Apache", "Solr", "Put", "Send","Record"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Indexes the Records from a FlowFile into Solr") +@DynamicPrope
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r176208228 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } +/** + * Writes each Record as a SolrInputDocument. + */ +public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List fieldsToIndex) +throws IOException { +RecordSchema schema = record.getSchema(); + +for (int i = 0; i < schema.getFieldCount(); i++) { +final RecordField field = schema.getField(i); +final String fieldName = field.getFieldName(); +final Object value = record.getValue(field); +if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { +continue; +}else { +final DataType dataType = schema.getDataType(fieldName).get(); +writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); +} +} +} +private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List fieldsToIndex) throws IOException { +final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; +final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); +if (coercedValue == null) { +return; +} + +switch (chosenDataType.getFieldType()) { +case DATE: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case TIMESTAMP: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); +break; +case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); +break; +case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); +break; +case INT: +case BYTE: +case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); +break; +case CHAR: +case STRING: +inputDocument.addField(fieldName,coercedValue.toString()); +break; +case BIGINT: +if (coercedValue instanceof Long) { +inputDocument.addField(fieldName,(Long) coercedValue); +} else { +inputDocument.addField(fieldName,(BigInteger) coercedValue); +} +break; +case BOOLEAN: +final String stringValue = coercedValue.toString(); +if ("true".equalsIgnoreCase(stringValue)) { +inputDocument.addField(fieldName,true);
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r176207854 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord; + + +@Tags({"Apache", "Solr", "Put", "Send","Record"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Indexes the Records from a FlowFile into Solr") +@DynamicPrope
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r176206589 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord; + + +@Tags({"Apache", "Solr", "Put", "Send","Record"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Indexes the Records from a FlowFile into Solr") +@DynamicPrope
[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 Sorry, instead of doing the force push i resolved conflicts and did a push, can i now do the rebase again on the current commit or will i have to add a new commit inorder to rebase from the master branch? ---
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175883171 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } +/** + * Writes each Record as a SolrInputDocument. + */ +public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List fieldsToIndex) +throws IOException { +RecordSchema schema = record.getSchema(); + +for (int i = 0; i < schema.getFieldCount(); i++) { +final RecordField field = schema.getField(i); +final String fieldName = field.getFieldName(); +final Object value = record.getValue(field); +if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { +continue; +}else { +final DataType dataType = schema.getDataType(fieldName).get(); +writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); +} +} +} +private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List fieldsToIndex) throws IOException { +final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; +final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); +if (coercedValue == null) { +return; +} + +switch (chosenDataType.getFieldType()) { +case DATE: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case TIMESTAMP: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); +break; +case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); +break; +case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); +break; +case INT: +case BYTE: +case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); +break; +case CHAR: +case STRING: +inputDocument.addField(fieldName,coercedValue.toString()); +break; +case BIGINT: +if (coercedValue instanceof Long) { +inputDocument.addField(fieldName,(Long) coercedValue); +} else { +inputDocument.addField(fieldName,(BigInteger) coercedValue); +} +break; +case BOOLEAN: +final String stringValue = coercedValue.toString(); +if ("true".equalsIgnoreCase(stringValue)) { +inputDocument.addField(fieldName,true);
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175882819 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord; + + +@Tags({"Apache", "Solr", "Put", "Send","Record"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Indexes the Records from a FlowFile into Solr") +@DynamicPrope
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175840062 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } +/** + * Writes each Record as a SolrInputDocument. + */ +public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List fieldsToIndex) +throws IOException { +RecordSchema schema = record.getSchema(); + +for (int i = 0; i < schema.getFieldCount(); i++) { +final RecordField field = schema.getField(i); +final String fieldName = field.getFieldName(); +final Object value = record.getValue(field); +if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { +continue; +}else { +final DataType dataType = schema.getDataType(fieldName).get(); +writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); +} +} +} +private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List fieldsToIndex) throws IOException { +final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; +final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); +if (coercedValue == null) { +return; +} + +switch (chosenDataType.getFieldType()) { +case DATE: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case TIMESTAMP: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); +break; +case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); +break; +case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); +break; +case INT: +case BYTE: +case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); +break; +case CHAR: +case STRING: +inputDocument.addField(fieldName,coercedValue.toString()); +break; +case BIGINT: +if (coercedValue instanceof Long) { +inputDocument.addField(fieldName,(Long) coercedValue); +} else { +inputDocument.addField(fieldName,(BigInteger) coercedValue); +} +break; +case BOOLEAN: +final String stringValue = coercedValue.toString(); +if ("true".equalsIgnoreCase(stringValue)) { +inputDocument.addField(fieldName,true);
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175839474 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } +/** + * Writes each Record as a SolrInputDocument. + */ +public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List fieldsToIndex) +throws IOException { +RecordSchema schema = record.getSchema(); + +for (int i = 0; i < schema.getFieldCount(); i++) { +final RecordField field = schema.getField(i); +final String fieldName = field.getFieldName(); +final Object value = record.getValue(field); +if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { +continue; +}else { +final DataType dataType = schema.getDataType(fieldName).get(); +writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); +} +} +} +private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List fieldsToIndex) throws IOException { +final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; +final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); +if (coercedValue == null) { +return; +} + +switch (chosenDataType.getFieldType()) { +case DATE: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case TIMESTAMP: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); +break; +case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); +break; +case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); +break; +case INT: +case BYTE: +case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); +break; +case CHAR: +case STRING: +inputDocument.addField(fieldName,coercedValue.toString()); +break; +case BIGINT: +if (coercedValue instanceof Long) { +inputDocument.addField(fieldName,(Long) coercedValue); +} else { +inputDocument.addField(fieldName,(BigInteger) coercedValue); +} +break; +case BOOLEAN: +final String stringValue = coercedValue.toString(); +if ("true".equalsIgnoreCase(stringValue)) { +inputDocument.addField(fieldName,true);
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175816508 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } +/** + * Writes each Record as a SolrInputDocument. + */ +public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List fieldsToIndex) +throws IOException { +RecordSchema schema = record.getSchema(); + +for (int i = 0; i < schema.getFieldCount(); i++) { +final RecordField field = schema.getField(i); +final String fieldName = field.getFieldName(); +final Object value = record.getValue(field); +if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { +continue; +}else { +final DataType dataType = schema.getDataType(fieldName).get(); +writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); +} +} +} +private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List fieldsToIndex) throws IOException { +final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; +final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); +if (coercedValue == null) { +return; +} + +switch (chosenDataType.getFieldType()) { +case DATE: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case TIMESTAMP: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); +break; +case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); +break; +case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); +break; +case INT: +case BYTE: +case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); +break; +case CHAR: +case STRING: +inputDocument.addField(fieldName,coercedValue.toString()); +break; +case BIGINT: +if (coercedValue instanceof Long) { +inputDocument.addField(fieldName,(Long) coercedValue); +} else { +inputDocument.addField(fieldName,(BigInteger) coercedValue); +} +break; +case BOOLEAN: +final String stringValue = coercedValue.toString(); +if ("true".equalsIgnoreCase(stringValue)) { +inputDocument.addField(fieldName,true);
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175801577 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } +/** + * Writes each Record as a SolrInputDocument. + */ +public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List fieldsToIndex) +throws IOException { +RecordSchema schema = record.getSchema(); + +for (int i = 0; i < schema.getFieldCount(); i++) { +final RecordField field = schema.getField(i); +final String fieldName = field.getFieldName(); +final Object value = record.getValue(field); +if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { +continue; +}else { +final DataType dataType = schema.getDataType(fieldName).get(); +writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); +} +} +} +private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List fieldsToIndex) throws IOException { +final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; +final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); +if (coercedValue == null) { +return; +} + +switch (chosenDataType.getFieldType()) { +case DATE: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case TIMESTAMP: { +final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); +if (DataTypeUtils.isLongTypeCompatible(stringValue)) { +LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); +} +break; +} +case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); +break; +case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); +break; +case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); +break; +case INT: +case BYTE: +case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); +break; +case CHAR: +case STRING: +inputDocument.addField(fieldName,coercedValue.toString()); +break; +case BIGINT: +if (coercedValue instanceof Long) { +inputDocument.addField(fieldName,(Long) coercedValue); +} else { +inputDocument.addField(fieldName,(BigInteger) coercedValue); +} +break; +case BOOLEAN: +final String stringValue = coercedValue.toString(); +if ("true".equalsIgnoreCase(stringValue)) { +inputDocument.addField(fieldName,true);
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175799302 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord; + + +@Tags({"Apache", "Solr", "Put", "Send","Record"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Indexes the Records from a FlowFile into Solr") +@DynamicPrope
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175797163 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/util/MockRecordParser.java --- @@ -0,0 +1,105 @@ +/* --- End diff -- Sure, I'll look into these tests. ---
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175796878 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.solr; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.MultiMapSolrParams; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD; +import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME; +import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION; +import static org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE; +import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD; +import static org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT; +import static org.apache.nifi.processors.solr.SolrUtils.writeRecord; + + +@Tags({"Apache", "Solr", "Put", "Send","Record"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Indexes the Records from a FlowFile into Solr") +@DynamicPrope
[GitHub] nifi issue #2561: NIFI-4035 Implement record-based Solr processors
Github user abhinavrohatgi30 commented on the issue: https://github.com/apache/nifi/pull/2561 NIFI-4035 Adding a PutSolrRecord Processor that reads NiFi Records and indexes them into Solr as SolrDocuments. ---
[GitHub] nifi pull request #2561: NIFI-4035 Implement record-based Solr processors
GitHub user abhinavrohatgi30 opened a pull request: https://github.com/apache/nifi/pull/2561 NIFI-4035 Implement record-based Solr processors Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/abhinavrohatgi30/nifi nifi-4035 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2561.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2561 commit 4532645294a225b5b3cde6cf59254a3a38ca15f6 Author: abhinavrohatgi30 Date: 2018-03-17T15:35:06Z Adding PutSolrRecord Processor that reads NiFi records and indexes them into Solr as SolrDocuments commit 313a95ef59f5fff31c6bd9a032bc4d82de7df2f9 Author: abhinavrohatgi30 Date: 2018-03-17T15:36:04Z Adding Test Cases for PutSolrRecord Processor commit 76003a1b1ef5449ee3cbd51b244dc27e946b5ea3 Author: abhinavrohatgi30 Date: 2018-03-17T15:36:58Z Adding PutSolrRecord Processor in the list of Processors ---