Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2587#discussion_r179827839
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
 ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.xml;
    +
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.ArrayDataType;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import javax.xml.stream.XMLEventReader;
    +import javax.xml.stream.XMLInputFactory;
    +import javax.xml.stream.XMLStreamException;
    +import javax.xml.stream.events.Attribute;
    +import javax.xml.stream.events.Characters;
    +import javax.xml.stream.events.StartElement;
    +import javax.xml.stream.events.XMLEvent;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.text.DateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.function.Supplier;
    +
    +public class XMLRecordReader implements RecordReader {
    +
    +    private final ComponentLog logger;
    +    private final RecordSchema schema;
    +    private final String recordName;
    +    private final String attributePrefix;
    +    private final String contentFieldName;
    +
    +    // thread safety required?
    +    private StartElement currentRecordStartTag;
    +
    +    private final XMLEventReader xmlEventReader;
    +
    +    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
    +
    +    public XMLRecordReader(InputStream in, RecordSchema schema, String 
rootName, String recordName, String attributePrefix, String contentFieldName,
    +                           final String dateFormat, final String 
timeFormat, final String timestampFormat, final ComponentLog logger) throws 
MalformedRecordException {
    +        this.schema = schema;
    +        this.recordName = recordName;
    +        this.attributePrefix = attributePrefix;
    +        this.contentFieldName = contentFieldName;
    +        this.logger = logger;
    +
    +        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
    +        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
    +        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
    +
    +        LAZY_DATE_FORMAT = () -> df;
    +        LAZY_TIME_FORMAT = () -> tf;
    +        LAZY_TIMESTAMP_FORMAT = () -> tsf;
    +
    +        try {
    +            final XMLInputFactory xmlInputFactory = 
XMLInputFactory.newInstance();
    +
    +            // Avoid namespace replacements
    +            
xmlInputFactory.setProperty(XMLInputFactory.IS_NAMESPACE_AWARE, false);
    +
    +            xmlEventReader = xmlInputFactory.createXMLEventReader(in);
    +            final StartElement rootTag = getNextStartTag();
    +
    +            // root tag validation
    +            if (rootName != null && 
!rootName.equals(rootTag.getName().toString())) {
    +                final StringBuffer message = new StringBuffer();
    +                message.append("Name of root tag \"")
    +                        .append(rootTag.getName().toString())
    +                        .append("\" does not match root tag validation \"")
    +                        .append(rootName)
    +                        .append("\".");
    +                throw new MalformedRecordException(message.toString());
    +            }
    +            setNextRecordStartTag();
    +        } catch (XMLStreamException e) {
    +            throw new MalformedRecordException("Could not parse XML", e);
    +        }
    +    }
    +
    +    private StartElement getNextStartTag() throws XMLStreamException {
    +        while (xmlEventReader.hasNext()) {
    +            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +            if (xmlEvent.isStartElement()) {
    +                return xmlEvent.asStartElement();
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private void setNextRecordStartTag() throws XMLStreamException {
    +        while (xmlEventReader.hasNext()) {
    +            final XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +            if (xmlEvent.isStartElement()) {
    +                final StartElement startElement = 
xmlEvent.asStartElement();
    +                if (recordName != null) {
    +                    if 
(startElement.getName().toString().equals(recordName)) {
    +                        currentRecordStartTag = startElement;
    +                        return;
    +                    } else {
    +                        logger.debug("Mismatch between expected record tag 
name {} and actual tag name in XML {}. " +
    +                                "Record will be skipped", new Object[] 
{recordName, startElement.getName().toString()});
    +                        skipElement();
    +                    }
    +                } else {
    +                    currentRecordStartTag = startElement;
    +                    return;
    +                }
    +            }
    +        }
    +        currentRecordStartTag = null;
    +    }
    +
    +    @Override
    +    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
    +        if (currentRecordStartTag == null) {
    +            return null;
    +        }
    +        try {
    +            final Record record = parseRecord(currentRecordStartTag, 
this.schema, coerceTypes, dropUnknownFields);
    +            setNextRecordStartTag();
    +            if (record != null) {
    +                return record;
    +            } else {
    +                return new MapRecord(this.schema, Collections.EMPTY_MAP);
    +            }
    +        } catch (XMLStreamException e) {
    +            throw new MalformedRecordException("Could not parse XML", e);
    +        }
    +    }
    +
    +    private Object parseFieldForType(StartElement startElement, String 
fieldName, DataType dataType, Map<String, Object> recordValues) throws 
XMLStreamException, MalformedRecordException {
    +        switch (dataType.getFieldType()) {
    +            case BOOLEAN:
    +            case BYTE:
    +            case CHAR:
    +            case DOUBLE:
    +            case FLOAT:
    +            case INT:
    +            case LONG:
    +            case SHORT:
    +            case STRING:
    +            case DATE:
    +            case TIME:
    +            case TIMESTAMP: {
    +                XMLEvent xmlEvent = xmlEventReader.nextEvent();
    +                if (xmlEvent.isCharacters()) {
    +                    final Characters characters = xmlEvent.asCharacters();
    +                    if (!characters.isWhiteSpace()) {
    +                        xmlEventReader.nextEvent();
    +                        return 
DataTypeUtils.convertType(characters.toString(), dataType, LAZY_DATE_FORMAT, 
LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
    +                    } else {
    +                        xmlEvent = xmlEventReader.nextEvent();
    --- End diff --
    
    In this case, we are moving on to the next event. We then check if this 
next event is either endElement or startElement. If it's neither of those, then 
we fall through to the ARRAY case statement. Need to be sure to address the 
'else' case here.


---

Reply via email to