Github user abhinavrohatgi30 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175801577
  
    --- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
 ---
    @@ -280,5 +291,115 @@ public SolrInputDocument 
toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema 
writeSchema, final SolrInputDocument inputDocument,final List<String> 
fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && 
!fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = 
schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, 
dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, 
final Object value, final String fieldName, final DataType dataType,final 
List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == 
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) 
dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, 
chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = 
getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = 
getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, 
fieldName));
    +                break;
    +            case FLOAT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, 
fieldName));
    +                break;
    +            case LONG:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, 
fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) 
coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    Yes, I've implemented it to flatten the nested record into a single solr 
document, although i was sort of unsure if there would be a need to consider 
nested records. If there is one, I'll make changes to the field name as you 
suggested so that one knows where the field came from.


---

Reply via email to