[ 
https://issues.apache.org/jira/browse/NIFI-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406332#comment-16406332
 ] 

ASF GitHub Bot commented on NIFI-4035:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2561#discussion_r175764796
  
    --- Diff: 
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrUtils.java
 ---
    @@ -280,5 +291,115 @@ public SolrInputDocument 
toSolrInputDocument(SolrDocument d) {
             }
         }
     
    +    /**
    +     * Writes each Record as a SolrInputDocument.
    +     */
    +    public static void writeRecord(final Record record, final RecordSchema 
writeSchema, final SolrInputDocument inputDocument,final List<String> 
fieldsToIndex)
    +            throws IOException {
    +        RecordSchema schema = record.getSchema();
    +
    +        for (int i = 0; i < schema.getFieldCount(); i++) {
    +            final RecordField field = schema.getField(i);
    +            final String fieldName = field.getFieldName();
    +            final Object value = record.getValue(field);
    +            if (value == null || (!fieldsToIndex.isEmpty() && 
!fieldsToIndex.contains(fieldName))) {
    +                continue;
    +            }else {
    +                final DataType dataType = 
schema.getDataType(fieldName).get();
    +                writeValue(inputDocument, value, fieldName, 
dataType,fieldsToIndex);
    +            }
    +        }
    +    }
     
    +    private static void writeValue(final SolrInputDocument inputDocument, 
final Object value, final String fieldName, final DataType dataType,final 
List<String> fieldsToIndex) throws IOException {
    +        final DataType chosenDataType = dataType.getFieldType() == 
RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) 
dataType) : dataType;
    +        final Object coercedValue = DataTypeUtils.convertType(value, 
chosenDataType, fieldName);
    +        if (coercedValue == null) {
    +            return;
    +        }
    +
    +        switch (chosenDataType.getFieldType()) {
    +            case DATE: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDate localDate = 
getLocalDateFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDate.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDate.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case TIMESTAMP: {
    +                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
    +                if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
    +                    LocalDateTime localDateTime = 
getLocalDateTimeFromEpochTime(fieldName, coercedValue);
    +                    
inputDocument.addField(fieldName,localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                } else {
    +                    
inputDocument.addField(fieldName,LocalDateTime.parse(stringValue).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+'Z');
    +                }
    +                break;
    +            }
    +            case DOUBLE:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toDouble(coercedValue, 
fieldName));
    +                break;
    +            case FLOAT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toFloat(coercedValue, 
fieldName));
    +                break;
    +            case LONG:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toLong(coercedValue, fieldName));
    +                break;
    +            case INT:
    +            case BYTE:
    +            case SHORT:
    +                
inputDocument.addField(fieldName,DataTypeUtils.toInteger(coercedValue, 
fieldName));
    +                break;
    +            case CHAR:
    +            case STRING:
    +                inputDocument.addField(fieldName,coercedValue.toString());
    +                break;
    +            case BIGINT:
    +                if (coercedValue instanceof Long) {
    +                    inputDocument.addField(fieldName,(Long) coercedValue);
    +                } else {
    +                    inputDocument.addField(fieldName,(BigInteger) 
coercedValue);
    +                }
    +                break;
    +            case BOOLEAN:
    +                final String stringValue = coercedValue.toString();
    +                if ("true".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,true);
    +                } else if ("false".equalsIgnoreCase(stringValue)) {
    +                    inputDocument.addField(fieldName,false);
    +                } else {
    +                    inputDocument.addField(fieldName,stringValue);
    +                }
    +                break;
    +            case RECORD: {
    --- End diff --
    
    How do nested records end up being represented in the Solr document? Not 
saying anything is wrong here, just asking to understand how it works.
    
    Lets say we have a person schema with top-level fields for "firstName" and 
"lastName", and "address", and the address field is of type record and then has 
it's own fields "street", "city", "zip"...  
    
    Does the resulting Solr document contain "firstName", "lastName", "street", 
"city", "zip"? 
    
    Would it make sense to have an option to include the parent field in the 
field names, so it ends up being "address_street", "address_city", and 
"address_zip" so that you know where those fields came from?


> Implement record-based Solr processors
> --------------------------------------
>
>                 Key: NIFI-4035
>                 URL: https://issues.apache.org/jira/browse/NIFI-4035
>             Project: Apache NiFi
>          Issue Type: Improvement
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Bryan Bende
>            Priority: Minor
>
> Now that we have record readers and writers, we should implement variants of 
> the existing Solr processors that record-based...
> Processors to consider:
> * PutSolrRecord - uses a configured record reader to read an incoming flow 
> file and insert records to Solr
> * GetSolrRecord - extracts records from Solr and uses a configured record 
> writer to write them to a flow file



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to