Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175810746
  
    --- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
 ---
    @@ -280,5 +291,115 @@ public SolrInputDocument 
toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema 
writeSchema, final SolrInputDocument inputDocument,final List<String> 
fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && 
!fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = 
schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, 
dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, 
final Object value, final String fieldName, final DataType dataType,final 
List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == 
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) 
dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, 
chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = 
getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = 
getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, 
fieldName));
    +                break;
    +            case FLOAT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, 
fieldName));
    +                break;
    +            case LONG:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, 
fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) 
coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    This also brings up another scenario... what do we do if there is an array 
field, and the type of the elements in the array is a record?
    
    That would be similar to the "exams" array in the above example. With 
Solr's JSON update handler you would have to say split=/exams and this produces 
a Solr document for each exam. 
    
    I'm actually not sure what Solr does if you left off the split param 
because then you would have multiple fields with the same name in the same 
document, like exams.subject would be there twice.


---

Reply via email to