Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/2561#discussion_r175858048 --- Diff: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java --- @@ -280,5 +291,115 @@ public SolrInputDocument toSolrInputDocument(SolrDocument d) { } } + /** + * Writes each Record as a SolrInputDocument. + */ + public static void writeRecord(final Record record, final RecordSchema writeSchema, final SolrInputDocument inputDocument,final List<String> fieldsToIndex) + throws IOException { + RecordSchema schema = record.getSchema(); + + for (int i = 0; i < schema.getFieldCount(); i++) { + final RecordField field = schema.getField(i); + final String fieldName = field.getFieldName(); + final Object value = record.getValue(field); + if (value == null || (!fieldsToIndex.isEmpty() && !fieldsToIndex.contains(fieldName))) { + continue; + }else { + final DataType dataType = schema.getDataType(fieldName).get(); + writeValue(inputDocument, value, fieldName, dataType,fieldsToIndex); + } + } + } + private static void writeValue(final SolrInputDocument inputDocument, final Object value, final String fieldName, final DataType dataType,final List<String> fieldsToIndex) throws IOException { + final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType; + final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, fieldName); + if (coercedValue == null) { + return; + } + + switch (chosenDataType.getFieldType()) { + case DATE: { + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat())); + if (DataTypeUtils.isLongTypeCompatible(stringValue)) { + LocalDate localDate = getLocalDateFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } else { + inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } + break; + } + case TIMESTAMP: { + final String stringValue = DataTypeUtils.toString(coercedValue, () -> DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat())); + if (DataTypeUtils.isLongTypeCompatible(stringValue)) { + LocalDateTime localDateTime = getLocalDateTimeFromEpochTime(fieldName, coercedValue); + inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } else { + inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z'); + } + break; + } + case DOUBLE: + inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, fieldName)); + break; + case FLOAT: + inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, fieldName)); + break; + case LONG: + inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName)); + break; + case INT: + case BYTE: + case SHORT: + inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, fieldName)); + break; + case CHAR: + case STRING: + inputDocument.addField(fieldName,coercedValue.toString()); + break; + case BIGINT: + if (coercedValue instanceof Long) { + inputDocument.addField(fieldName,(Long) coercedValue); + } else { + inputDocument.addField(fieldName,(BigInteger) coercedValue); + } + break; + case BOOLEAN: + final String stringValue = coercedValue.toString(); + if ("true".equalsIgnoreCase(stringValue)) { + inputDocument.addField(fieldName,true); + } else if ("false".equalsIgnoreCase(stringValue)) { + inputDocument.addField(fieldName,false); + } else { + inputDocument.addField(fieldName,stringValue); + } + break; + case RECORD: { --- End diff -- I think the current approach is probably fine for now, maybe we can make a note in the @CapabilityDescription that mentions how nested records are handled, or in the additionalDetails.html you could provide example input and show it would be handled. Someone can always use ConvertRecord to convert whatever format to JSON, and then use the existing PutSolrContentStream with Solr's splitting if they want to do something like that.
---