Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175809550 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } + /** + * Writes each Record as a SolrInputDocument. + */ + public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex) + throws IOException { + RecordSchema schema = record.getSchema(); + + for (int i = 0; i < schema.getFieldCount(); i++) { + final RecordField field = schema.getField(i); + final String fieldName = field.getFieldName(); + final Object value = record.getValue(field); + if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { + continue; + }else { + final DataType dataType = schema.getDataType(fieldName).get(); + writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); + } + } + } + private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException { + final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; + final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); + if (coercedValue == null) { + return; + } + + switch (chosenDataType.getFieldType()) { + case DATE: { + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); + if (DataTypeUtils.isLongTypeCompatible(stringValue)) { + LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } + break; + } + case TIMESTAMP: { + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); + if (DataTypeUtils.isLongTypeCompatible(stringValue)) { + LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } + break; + } + case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); + break; + case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); + break; + case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); + break; + case INT: + case BYTE: + case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); + break; + case CHAR: + case STRING: + inputDocument.addField(fieldName,coercedValue.toString()); + break; + case BIGINT: + if (coercedValue instanceof Long) { + inputDocument.addField(fieldName,(Long) coercedValue); + } else { + inputDocument.addField(fieldName,(BigInteger) coercedValue); + } + break; + case BOOLEAN: + final String stringValue = coercedValue.toString(); + if ("true".equalsIgnoreCase(stringValue)) { + inputDocument.addField(fieldName,true); + } else if ("false".equalsIgnoreCase(stringValue)) { + inputDocument.addField(fieldName,false); + } else { + inputDocument.addField(fieldName,stringValue); + } + break; + case RECORD: { --- End diff -- I think we have to handle it since someone can specify a field name in "fields to index" that could be of type record. I think it makes sense to have a property like "Nested Field Names" with choices for "Fully Qualified" and "Child Only" (or something like that). This lines up with how Solr's JSON update works: https://lucene.apache.org/solr/guide/6_6/transforming-and-indexing-custom-json.html#transforming-and-indexing-custom-json The part that shows.... The default behavior is to use the fully qualified name (FQN) of the node. So, if we donât define any field mappings, like this: curl 'http://localhost:8983/solr/my_collection/update/json/docs?split=/exams'\ -H 'Content-type:application/json' -d ' { "first": "John", "last": "Doe", "grade": 8, "exams": [ { "subject": "Maths", "test" : "term1", "marks" : 90}, { "subject": "Biology", "test" : "term1", "marks" : 86} ] }' The indexed documents would be added to the index with fields that look like this: { "first":"John", "last":"Doe", "grade":8, "exams.subject":"Maths", "exams.test":"term1", "exams.marks":90}, { "first":"John", "last":"Doe", "grade":8, "exams.subject":"Biology", "exams.test":"term1", "exams.marks":86}
---